Commit 02dd0b7c authored by sml's avatar sml

前导语句,后导语句

parent ea27381f
...@@ -302,7 +302,7 @@ ...@@ -302,7 +302,7 @@
<artifactId>oshi-core</artifactId> <artifactId>oshi-core</artifactId>
<version>3.5.0</version> <version>3.5.0</version>
</dependency> </dependency>
<!--
<dependency> <dependency>
<groupId>com.baomidou</groupId> <groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-annotation</artifactId> <artifactId>mybatis-plus-annotation</artifactId>
...@@ -327,7 +327,7 @@ ...@@ -327,7 +327,7 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
-->
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <artifactId>hadoop-common</artifactId>
...@@ -541,6 +541,18 @@ ...@@ -541,6 +541,18 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
...@@ -553,6 +565,13 @@ ...@@ -553,6 +565,13 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- druid数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.12</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
<finalName>jz-dmp-cmdexectool</finalName> <finalName>jz-dmp-cmdexectool</finalName>
......
package com.jz.dmp.cmdexectool.common.utils;
import java.security.Key;
import java.util.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
public class EncryptionUtils {
/**
* @Title: encode
* @Description: TODO(加密)
* @param @param pass
* @param @param publicKey
* @param @return 参数
* @return String 返回类型
* @throws
*/
public static String encode(String pass,String publicKey) {
String str = "";
try {
Key key = new SecretKeySpec(Base64.getMimeDecoder().decode(publicKey), "DESede");
Cipher cipher = Cipher.getInstance("DESede");
cipher.init(Cipher.ENCRYPT_MODE, key);
str = Base64.getMimeEncoder().encodeToString(cipher.doFinal(pass.getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
return str;
}
/**
* @Title: decode
* @Description: TODO(解密)
* @param @param pass
* @param @param publicKey
* @param @return 参数
* @return String 返回类型
* @throws
*/
public static String decode(String pass,String publicKey) {
String str = "";
try {
Key key = new SecretKeySpec(Base64.getMimeDecoder().decode(publicKey), "DESede");
Cipher cipher = Cipher.getInstance("DESede");
cipher.init(Cipher.DECRYPT_MODE, key);
str = new String(cipher.doFinal(Base64.getMimeDecoder().decode(pass)));
} catch (Exception e) {
e.printStackTrace();
}
return str;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.common.enums;
public enum DbConnectType {
ORACLE_SERVICE_NAME(0, "Oracle Service Name"),
ORACLE_SID(1, "Oracle SID");
DbConnectType(int code, String descp) {
this.code = code;
this.descp = descp;
}
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.common.enums;
import java.util.HashMap;
/**
* data base types
*/
public enum DbType {
/**
* 0 mysql
* 1 postgresql
* 2 hive
* 3 spark
* 4 clickhouse
* 5 oracle
* 6 sqlserver
* 7 db2
* 8 presto
*/
MYSQL(0, "mysql"),
POSTGRESQL(1, "postgresql"),
HIVE(2, "hive"),
SPARK(3, "spark"),
CLICKHOUSE(4, "clickhouse"),
ORACLE(5, "oracle"),
SQLSERVER(6, "sqlserver"),
DB2(7, "db2"),
PRESTO(8, "presto");
DbType(int code, String descp) {
this.code = code;
this.descp = descp;
}
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
private static HashMap<Integer, DbType> DB_TYPE_MAP =new HashMap<>();
static {
for (DbType dbType:DbType.values()){
DB_TYPE_MAP.put(dbType.getCode(),dbType);
}
}
public static DbType of(int type){
if(DB_TYPE_MAP.containsKey(type)){
return DB_TYPE_MAP.get(type);
}
throw new IllegalArgumentException("invalid type : " + type);
}
}
package com.jz.dmp.cmdexectool.scheduler.common.enums;
/**
* @ClassName: MyDbType
* @Description: TODO(DMP数据库类型枚举)
* @author ybz
* @date 2021年3月5日
*
*/
public enum MyDbType {
MySQL("1", DbType.MYSQL),
SQLServer("2", DbType.SQLSERVER),
PostgreSQL("3", DbType.POSTGRESQL),
Oracle("4", DbType.ORACLE),
DM("5", null),
DB2("6", DbType.DB2),
Hive("7", DbType.HIVE),
Impala("8", null),
Kudu("9", null),
INFORMIX("21", null);
private String idStr;
private DbType dbType;
private MyDbType(String idStr, DbType dbType) {
this.idStr = idStr;
this.dbType = dbType;
}
public String getIdStr() {
return idStr;
}
public DbType getDbType() {
return dbType;
}
/**
* @Title: obtainByIdStr
* @Description: TODO(根据数据源类型主键获取枚举)
* @param @param idStr
* @param @return 参数
* @return MyDbType 返回类型
* @throws
*/
public static MyDbType obtainByIdStr(String idStr) {
MyDbType myDbTypeMatch = null;
for (MyDbType myDbType : MyDbType.values()) {
if (myDbType.idStr.equals(idStr)) {
myDbTypeMatch = myDbType;
break;
}
}
return myDbTypeMatch;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.sql;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import java.util.Map;
/**
* Used to contains both prepared sql string and its to-be-bind parameters
*/
public class SqlBinds {
private final String sql;
private final Map<Integer, Property> paramsMap;
public SqlBinds(String sql, Map<Integer, Property> paramsMap) {
this.sql = sql;
this.paramsMap = paramsMap;
}
public String getSql() {
return sql;
}
public Map<Integer, Property> getParamsMap() {
return paramsMap;
}
}
...@@ -16,23 +16,25 @@ ...@@ -16,23 +16,25 @@
*/ */
package com.jz.dmp.cmdexectool.scheduler.common.task.sql; package com.jz.dmp.cmdexectool.scheduler.common.task.sql;
import com.alibaba.fastjson.JSONArray; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant; import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.EncryptionUtils;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils; import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto; import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource; import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao; import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/** /**
* Sql/Hql parameter * Sql/Hql parameter
...@@ -71,12 +73,42 @@ public class SqlParameters extends AbstractParameters { ...@@ -71,12 +73,42 @@ public class SqlParameters extends AbstractParameters {
*/ */
private String waterdropScript; private String waterdropScript;
/**
* 前置语句
*/
private List<String> preStatements;
/**
* 后置语句
*/
private List<String> posStatements;
/**
* jdbcUrl
*/
private String jdbcUrl;
/**
* jdbc user
*/
private String user;
/**
* jdbc password
*/
private String password;
/**
* 数据源类型
*/
private MyDbType myDbType;
/** /**
* resource list * resource list
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
public SqlParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) { public SqlParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig, String publicKey) {
source = ""; source = "";
env = ""; env = "";
sink = ""; sink = "";
...@@ -97,16 +129,21 @@ public class SqlParameters extends AbstractParameters { ...@@ -97,16 +129,21 @@ public class SqlParameters extends AbstractParameters {
Integer sourceId = scriptObj.getInteger("sourceId"); Integer sourceId = scriptObj.getInteger("sourceId");
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId); DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId);
this.jdbcUrl = dmpSyncingDatasource.getJdbcUrl();
this.user = dmpSyncingDatasource.getUserName();
this.password = EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey);
this.myDbType = MyDbType.obtainByIdStr(dmpSyncingDatasource.getId().toString());
String sourceTableNames = scriptObj.getString("sourceTableNames"); String sourceTableNames = scriptObj.getString("sourceTableNames");
String[] tableNameArr = sourceTableNames.split(","); String[] tableNameArr = sourceTableNames.split(",");
for (String tableName : tableNameArr) { for (String tableName : tableNameArr) {
Map<String, String> jdbcModel = new HashMap<String, String>(); Map<String, String> jdbcModel = new HashMap<String, String>();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName()); jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl()); jdbcModel.put("url", this.jdbcUrl);
jdbcModel.put("table", tableName); jdbcModel.put("table", tableName);
jdbcModel.put("result_table_name", tableName); jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName()); jdbcModel.put("user", this.user);
jdbcModel.put("password", dmpSyncingDatasource.getPassword()); jdbcModel.put("password", this.password);
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig); source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
} }
...@@ -204,7 +241,12 @@ public class SqlParameters extends AbstractParameters { ...@@ -204,7 +241,12 @@ public class SqlParameters extends AbstractParameters {
waterdropModel.put("source", source); waterdropModel.put("source", source);
waterdropModel.put("transform", transform); waterdropModel.put("transform", transform);
waterdropModel.put("sink", sink); waterdropModel.put("sink", sink);
waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig); this.waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
preStatements = new ArrayList<String>();
preStatements.add("insert into test(id, name) values(1, 'test')");
posStatements = new ArrayList<String>();
posStatements.add("insert into test(id, name) values(2, 'test2')");
} }
...@@ -264,6 +306,22 @@ public class SqlParameters extends AbstractParameters { ...@@ -264,6 +306,22 @@ public class SqlParameters extends AbstractParameters {
this.waterdropScript = waterdropScript; this.waterdropScript = waterdropScript;
} }
public List<String> getPreStatements() {
return preStatements;
}
public void setPreStatements(List<String> preStatements) {
this.preStatements = preStatements;
}
public List<String> getPosStatements() {
return posStatements;
}
public void setPosStatements(List<String> posStatements) {
this.posStatements = posStatements;
}
public List<ResourceInfo> getResourceList() { public List<ResourceInfo> getResourceList() {
return resourceList; return resourceList;
} }
...@@ -272,6 +330,38 @@ public class SqlParameters extends AbstractParameters { ...@@ -272,6 +330,38 @@ public class SqlParameters extends AbstractParameters {
this.resourceList = resourceList; this.resourceList = resourceList;
} }
public String getJdbcUrl() {
return jdbcUrl;
}
public void setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public MyDbType getMyDbType() {
return myDbType;
}
public void setMyDbType(MyDbType myDbType) {
this.myDbType = myDbType;
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return waterdropScript != null && !waterdropScript.isEmpty(); return waterdropScript != null && !waterdropScript.isEmpty();
......
...@@ -75,8 +75,10 @@ public class CommonUtils { ...@@ -75,8 +75,10 @@ public class CommonUtils {
*/ */
public static boolean getKerberosStartupState() { public static boolean getKerberosStartupState() {
String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE); String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
//String resUploadStartupType = "NONE";
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false); Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false);
//Boolean kerberosStartupState = false;
return resUploadType == ResUploadType.HDFS && kerberosStartupState; return resUploadType == ResUploadType.HDFS && kerberosStartupState;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CommonUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* data source base class
*/
public abstract class BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(BaseDataSource.class);
/**
* user name
*/
protected String user;
/**
* user password
*/
protected String password;
/**
* data source address
*/
private String address;
/**
* 直接获取
*/
private String jdbcUrlDirect;
/**
* database name
*/
private String database;
/**
* other connection parameters for the data source
*/
private String other;
/**
* principal
*/
private String principal;
private String dbType;
public String getPrincipal() {
return principal;
}
public void setPrincipal(String principal) {
this.principal = principal;
}
/**
* @return driver class
*/
public abstract String driverClassSelector();
/**
* @return db type
*/
public abstract DbType dbTypeSelector();
/**
* gets the JDBC url for the data source connection
* @return getJdbcUrl
*/
public String getJdbcUrl() {
StringBuilder jdbcUrl = new StringBuilder(getAddress());
appendDatabase(jdbcUrl);
appendPrincipal(jdbcUrl);
appendOther(jdbcUrl);
return jdbcUrl.toString();
}
/**
* append database
* @param jdbcUrl jdbc url
*/
protected void appendDatabase(StringBuilder jdbcUrl) {
if (dbTypeSelector() == DbType.SQLSERVER) {
jdbcUrl.append(";databaseName=").append(getDatabase());
} else {
if (getAddress().lastIndexOf('/') != (jdbcUrl.length() - 1)) {
jdbcUrl.append("/");
}
jdbcUrl.append(getDatabase());
}
}
/**
* append principal
* @param jdbcUrl jdbc url
*/
private void appendPrincipal(StringBuilder jdbcUrl) {
boolean tag = dbTypeSelector() == DbType.HIVE || dbTypeSelector() == DbType.SPARK;
if (tag && StringUtils.isNotEmpty(getPrincipal())) {
jdbcUrl.append(";principal=").append(getPrincipal());
}
}
/**
* append other
* @param jdbcUrl jdbc url
*/
private void appendOther(StringBuilder jdbcUrl) {
String otherParams = filterOther(getOther());
if (StringUtils.isNotEmpty(otherParams)) {
String separator = "";
switch (dbTypeSelector()) {
case CLICKHOUSE:
case MYSQL:
case ORACLE:
case POSTGRESQL:
case PRESTO:
separator = "?";
break;
case DB2:
separator = ":";
break;
case HIVE:
case SPARK:
case SQLSERVER:
separator = ";";
break;
default:
logger.error("Db type mismatch!");
}
jdbcUrl.append(separator).append(otherParams);
}
}
protected String filterOther(String otherParams){
return otherParams;
}
/**
* test whether the data source can be connected successfully
*/
public void isConnectable() {
Connection con = null;
try {
Class.forName(driverClassSelector());
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} catch (ClassNotFoundException | SQLException e) {
logger.error("Get connection error: {}", e.getMessage());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
/**
* password need decode
* @return
*/
public String getPassword() {
return CommonUtils.decodePassword(password);
}
public void setPassword(String password) {
this.password = password;
}
public void setAddress(String address) {
this.address = address;
}
public String getAddress() {
return address;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getOther() {
return other;
}
public void setOther(String other) {
this.other = other;
}
public String getDbType() {
return dbType;
}
public void setDbType(String dbType) {
this.dbType = dbType;
}
public String getJdbcUrlDirect() {
return jdbcUrlDirect;
}
public void setJdbcUrlDirect(String jdbcUrlDirect) {
this.jdbcUrlDirect = jdbcUrlDirect;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
/**
* data source of ClickHouse
*/
public class ClickHouseDataSource extends BaseDataSource {
/**
* @return driver class
*/
@Override
public String driverClassSelector() {
return Constants.COM_CLICKHOUSE_JDBC_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.CLICKHOUSE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import javax.sql.DataSource;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
/**
* not spring manager connection, only use for init db, and alert module for non-spring application
* data source connection factory
*/
public class ConnectionFactory extends SpringConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);
private static class ConnectionFactoryHolder {
private static final ConnectionFactory connectionFactory = new ConnectionFactory();
}
public static ConnectionFactory getInstance() {
return ConnectionFactoryHolder.connectionFactory;
}
private ConnectionFactory() {
try {
dataSource = buildDataSource();
sqlSessionFactory = getSqlSessionFactory();
sqlSessionTemplate = getSqlSessionTemplate();
} catch (Exception e) {
logger.error("Initializing ConnectionFactory error", e);
throw new RuntimeException(e);
}
}
/**
* sql session factory
*/
private SqlSessionFactory sqlSessionFactory;
/**
* sql session template
*/
private SqlSessionTemplate sqlSessionTemplate;
private DataSource dataSource;
public DataSource getDataSource() {
return dataSource;
}
/**
* get the data source
*
* @return druid dataSource
*/
private DataSource buildDataSource() {
DruidDataSource druidDataSource = dataSource();
return druidDataSource;
}
/**
* * get sql session factory
*
* @return sqlSessionFactory
* @throws Exception sqlSessionFactory exception
*/
private SqlSessionFactory getSqlSessionFactory() throws Exception {
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, getDataSource());
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setEnvironment(environment);
configuration.setLazyLoadingEnabled(true);
configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
configuration.addInterceptor(new PaginationInterceptor());
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setDataSource(getDataSource());
sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums");
sqlSessionFactory = sqlSessionFactoryBean.getObject();
return sqlSessionFactory;
}
private SqlSessionTemplate getSqlSessionTemplate() {
sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory);
return sqlSessionTemplate;
}
/**
* get sql session
*
* @return sqlSession
*/
public SqlSession getSqlSession() {
return sqlSessionTemplate;
}
/**
* get mapper
*
* @param type target class
* @param <T> generic
* @return target object
*/
public <T> T getMapper(Class<T> type) {
try {
return getSqlSession().getMapper(type);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("get mapper failed");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
/**
* data source of DB2 Server
*/
public class DB2ServerDataSource extends BaseDataSource {
/**
* gets the JDBC url for the data source connection
* @return jdbc url
*/
@Override
public String driverClassSelector() {
return Constants.COM_DB2_JDBC_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.DB2;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* produce datasource in this custom defined datasource factory.
*/
public class DataSourceFactory {
private static final Logger logger = LoggerFactory.getLogger(DataSourceFactory.class);
/**
* getDatasource
* @param dbType dbType
* @param parameter parameter
* @return getDatasource
*/
public static BaseDataSource getDatasource(DbType dbType, String parameter) {
try {
switch (dbType) {
case MYSQL:
return JSONUtils.parseObject(parameter, MySQLDataSource.class);
case POSTGRESQL:
return JSONUtils.parseObject(parameter, PostgreDataSource.class);
case HIVE:
return JSONUtils.parseObject(parameter, HiveDataSource.class);
case SPARK:
return JSONUtils.parseObject(parameter, SparkDataSource.class);
case CLICKHOUSE:
return JSONUtils.parseObject(parameter, ClickHouseDataSource.class);
case ORACLE:
return JSONUtils.parseObject(parameter, OracleDataSource.class);
case SQLSERVER:
return JSONUtils.parseObject(parameter, SQLServerDataSource.class);
case DB2:
return JSONUtils.parseObject(parameter, DB2ServerDataSource.class);
case PRESTO:
return JSONUtils.parseObject(parameter, PrestoDataSource.class);
default:
return null;
}
} catch (Exception e) {
logger.error("get datasource object error", e);
return null;
}
}
/**
* load class
* @param dbType
* @throws Exception
*/
public static void loadClass(DbType dbType) throws Exception{
switch (dbType){
case MYSQL :
Class.forName(Constants.COM_MYSQL_JDBC_DRIVER);
break;
case POSTGRESQL :
Class.forName(Constants.ORG_POSTGRESQL_DRIVER);
break;
case HIVE :
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
break;
case SPARK :
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
break;
case CLICKHOUSE :
Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER);
break;
case ORACLE :
Class.forName(Constants.COM_ORACLE_JDBC_DRIVER);
break;
case SQLSERVER:
Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER);
break;
case DB2:
Class.forName(Constants.COM_DB2_JDBC_DRIVER);
break;
case PRESTO:
Class.forName(Constants.COM_PRESTO_JDBC_DRIVER);
break;
default:
logger.error("not support sql type: {},can't load class", dbType);
throw new IllegalArgumentException("not support sql type,can't load class");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.StringUtils;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
/**
* data source of hive
*/
public class HiveDataSource extends BaseDataSource {
/**
* gets the JDBC url for the data source connection
* @return jdbc url
*/
@Override
public String driverClassSelector() {
return Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.HIVE;
}
/**
* build hive jdbc params,append : ?hive_conf_list
*
* hive jdbc url template:
*
* jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list
*
* @param otherParams otherParams
* @return filter otherParams
*/
@Override
protected String filterOther(String otherParams) {
if (StringUtils.isBlank(otherParams)) {
return "";
}
StringBuilder hiveConfListSb = new StringBuilder();
hiveConfListSb.append("?");
StringBuilder sessionVarListSb = new StringBuilder();
String[] otherArray = otherParams.split(";", -1);
// get the default hive conf var name
Set<String> hiveConfSet = Stream.of(ConfVars.values()).map(confVars -> confVars.varname)
.collect(Collectors.toSet());
for (String conf : otherArray) {
if (hiveConfSet.contains(conf.split("=")[0])) {
hiveConfListSb.append(conf).append(";");
} else {
sessionVarListSb.append(conf).append(";");
}
}
// remove the last ";"
if (sessionVarListSb.length() > 0) {
sessionVarListSb.deleteCharAt(sessionVarListSb.length() - 1);
}
if (hiveConfListSb.length() > 0) {
hiveConfListSb.deleteCharAt(hiveConfListSb.length() - 1);
}
return sessionVarListSb.toString() + hiveConfListSb.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* data source of mySQL
*/
public class MySQLDataSource extends BaseDataSource {
private final Logger logger = LoggerFactory.getLogger(MySQLDataSource.class);
private final String sensitiveParam = "autoDeserialize=true";
private final char symbol = '&';
/**
* gets the JDBC url for the data source connection
* @return jdbc url
*/
@Override
public String driverClassSelector() {
return Constants.COM_MYSQL_JDBC_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.MYSQL;
}
@Override
protected String filterOther(String other){
if(StringUtils.isBlank(other)){
return "";
}
if(other.contains(sensitiveParam)){
int index = other.indexOf(sensitiveParam);
String tmp = sensitiveParam;
if(index == 0 || other.charAt(index + 1) == symbol){
tmp = tmp + symbol;
} else if(other.charAt(index - 1) == symbol){
tmp = symbol + tmp;
}
logger.warn("sensitive param : {} in otherParams field is filtered", tmp);
other = other.replace(tmp, "");
}
logger.debug("other : {}", other);
return other;
}
@Override
public String getUser() {
if(user.contains(sensitiveParam)){
logger.warn("sensitive param : {} in username field is filtered", sensitiveParam);
user = user.replace(sensitiveParam, "");
}
logger.debug("username : {}", user);
return user;
}
@Override
public String getPassword() {
// password need decode
password = super.getPassword();
if(password.contains(sensitiveParam)){
logger.warn("sensitive param : {} in password field is filtered", sensitiveParam);
password = password.replace(sensitiveParam, "");
}
return password;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbConnectType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
/**
* data source of Oracle
*/
public class OracleDataSource extends BaseDataSource {
private DbConnectType connectType;
public DbConnectType getConnectType() {
return connectType;
}
public void setConnectType(DbConnectType connectType) {
this.connectType = connectType;
}
/**
* @return driver class
*/
@Override
public String driverClassSelector() {
return Constants.COM_ORACLE_JDBC_DRIVER;
}
/**
* append service name or SID
*/
@Override
protected void appendDatabase(StringBuilder jdbcUrl) {
if (getConnectType() == DbConnectType.ORACLE_SID) {
jdbcUrl.append(":");
} else {
jdbcUrl.append("/");
}
jdbcUrl.append(getDatabase());
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.ORACLE;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
/**
* data source of postgreSQL
*/
public class PostgreDataSource extends BaseDataSource {
/**
* gets the JDBC url for the data source connection
* @return jdbc url
*/
@Override
public String driverClassSelector() {
return Constants.ORG_POSTGRESQL_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.POSTGRESQL;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
public class PrestoDataSource extends BaseDataSource {
/**
* @return driver class
*/
@Override
public String driverClassSelector() {
return Constants.COM_PRESTO_JDBC_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.PRESTO;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/**
* data source of SQL Server
*/
public class SQLServerDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(SQLServerDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return jdbc url
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
jdbcUrl += ";databaseName=" + getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += ";" + getOther();
}
return jdbcUrl;
}
/**
* test whether the data source can be connected successfully
*/
@Override
public void isConnectable() {
Connection con = null;
try {
Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} catch (Exception e) {
logger.error("error", e);
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("SQL Server datasource try conn close conn error", e);
}
}
}
}
/**
* @return driver class
*/
@Override
public String driverClassSelector() {
return Constants.COM_SQLSERVER_JDBC_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.SQLSERVER;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
/**
* data source of spark
*/
public class SparkDataSource extends BaseDataSource {
/**
* gets the JDBC url for the data source connection
* @return jdbc url
*/
@Override
public String driverClassSelector() {
return Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.SPARK;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.dao.datasource;
import com.alibaba.druid.pool.DruidDataSource;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.core.MybatisConfiguration;
import com.baomidou.mybatisplus.core.config.GlobalConfig;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.dao.utils.PropertyUtils;
import org.apache.ibatis.mapping.DatabaseIdProvider;
import org.apache.ibatis.mapping.VendorDatabaseIdProvider;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.type.JdbcType;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import java.util.Properties;
/**
* data source connection factory
*/
//@Configuration
//@MapperScan("org.apache.dolphinscheduler.*.mapper")
public class SpringConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(SpringConnectionFactory.class);
/**
* pagination interceptor
* @return pagination interceptor
*/
@Bean
public PaginationInterceptor paginationInterceptor() {
return new PaginationInterceptor();
}
/**
* get the data source
* @return druid dataSource
*/
@Bean(destroyMethod="")
public DruidDataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setDriverClassName(PropertyUtils.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME));
druidDataSource.setUrl(PropertyUtils.getString(Constants.SPRING_DATASOURCE_URL));
druidDataSource.setUsername(PropertyUtils.getString(Constants.SPRING_DATASOURCE_USERNAME));
druidDataSource.setPassword(PropertyUtils.getString(Constants.SPRING_DATASOURCE_PASSWORD));
druidDataSource.setValidationQuery(PropertyUtils.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY,"SELECT 1"));
druidDataSource.setPoolPreparedStatements(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS,true));
druidDataSource.setTestWhileIdle(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE,true));
druidDataSource.setTestOnBorrow(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW,true));
druidDataSource.setTestOnReturn(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN,true));
druidDataSource.setKeepAlive(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE,true));
druidDataSource.setMinIdle(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE,5));
druidDataSource.setMaxActive(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE,50));
druidDataSource.setMaxWait(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT,60000));
druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE,20));
druidDataSource.setInitialSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE,5));
druidDataSource.setTimeBetweenEvictionRunsMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS,60000));
druidDataSource.setTimeBetweenConnectErrorMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS,60000));
druidDataSource.setMinEvictableIdleTimeMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS,300000));
druidDataSource.setValidationQueryTimeout(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT,3));
//auto commit
druidDataSource.setDefaultAutoCommit(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT,true));
return druidDataSource;
}
/**
* * get transaction manager
* @return DataSourceTransactionManager
*/
@Bean
public DataSourceTransactionManager transactionManager() {
return new DataSourceTransactionManager(dataSource());
}
/**
* * get sql session factory
* @return sqlSessionFactory
* @throws Exception sqlSessionFactory exception
*/
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setMapUnderscoreToCamelCase(true);
configuration.setCacheEnabled(false);
configuration.setCallSettersOnNulls(true);
configuration.setJdbcTypeForNull(JdbcType.NULL);
configuration.addInterceptor(paginationInterceptor());
MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
sqlSessionFactoryBean.setConfiguration(configuration);
sqlSessionFactoryBean.setDataSource(dataSource());
GlobalConfig.DbConfig dbConfig = new GlobalConfig.DbConfig();
dbConfig.setIdType(IdType.AUTO);
GlobalConfig globalConfig = new GlobalConfig();
globalConfig.setDbConfig(dbConfig);
sqlSessionFactoryBean.setGlobalConfig(globalConfig);
sqlSessionFactoryBean.setTypeAliasesPackage("org.apache.dolphinscheduler.dao.entity");
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
sqlSessionFactoryBean.setMapperLocations(resolver.getResources("org/apache/dolphinscheduler/dao/mapper/*Mapper.xml"));
sqlSessionFactoryBean.setTypeEnumsPackage("org.apache.dolphinscheduler.*.enums");
sqlSessionFactoryBean.setDatabaseIdProvider(databaseIdProvider());
return sqlSessionFactoryBean.getObject();
}
/**
* get sql session
* @return SqlSession
* @throws Exception
*/
@Bean
public SqlSession sqlSession() throws Exception{
return new SqlSessionTemplate(sqlSessionFactory());
}
@Bean
public DatabaseIdProvider databaseIdProvider(){
DatabaseIdProvider databaseIdProvider = new VendorDatabaseIdProvider();
Properties properties = new Properties();
properties.setProperty("MySQL", "mysql");
properties.setProperty("PostgreSQL", "pg");
databaseIdProvider.setProperties(properties);
return databaseIdProvider;
}
}
package com.jz.dmp.cmdexectool.scheduler.dao.utils;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.HIVE_CONF;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.PASSWORD;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.SEMICOLON;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.USER;
import static com.jz.dmp.cmdexectool.scheduler.common.enums.DbType.HIVE;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlBinds;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CollectionUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CommonUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils;
public class DatabaseUtils {
private static Logger logger = LoggerFactory.getLogger(DatabaseUtils.class);
private static final int LIMIT = 10000;
/**
* execute function and sql
* @param mainSqlBinds main sql binds
* @param preStatementsBinds pre statements binds
* @param postStatementsBinds post statements binds
* @param createFuncs create functions
*/
public static void executeUpdateSql(List<SqlBinds> statementsBinds, BaseDataSource baseDataSource){
Connection connection = null;
PreparedStatement stmt = null;
ResultSet resultSet = null;
try {
// if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf();
// create connection
connection = createConnection(baseDataSource);
// create temp function
/*
if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection,createFuncs);
}
*/
// pre sql
preSql(connection,statementsBinds);
} catch (Exception e) {
logger.error("execute sql error",e);
throw new RuntimeException("execute sql error");
} finally {
close(resultSet,stmt,connection);
}
}
/**
* execute function and sql
* @param mainSqlBinds main sql binds
* @param preStatementsBinds pre statements binds
* @param postStatementsBinds post statements binds
* @param createFuncs create functions
*/
public static void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs,
BaseDataSource baseDataSource){
Connection connection = null;
PreparedStatement stmt = null;
ResultSet resultSet = null;
try {
// if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf();
// create connection
connection = createConnection(baseDataSource);
// create temp function
/*
if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection,createFuncs);
}
*/
// pre sql
preSql(connection,preStatementsBinds);
stmt = prepareStatementAndBind(connection, mainSqlBinds);
// decide whether to executeQuery or executeUpdate based on sqlType
if (1 == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
resultSet = stmt.executeQuery();
resultProcess(resultSet);
} else if (1 == SqlType.NON_QUERY.ordinal()) {
// non query statement
stmt.executeUpdate();
}
postSql(connection,postStatementsBinds);
} catch (Exception e) {
logger.error("execute sql error",e);
throw new RuntimeException("execute sql error");
} finally {
close(resultSet,stmt,connection);
}
}
/**
* close jdbc resource
*
* @param resultSet resultSet
* @param pstmt pstmt
* @param connection connection
*/
private static void close(ResultSet resultSet,
PreparedStatement pstmt,
Connection connection){
if (resultSet != null){
try {
resultSet.close();
} catch (SQLException e) {
logger.error("close result set error : {}",e.getMessage(),e);
}
}
if (pstmt != null){
try {
pstmt.close();
} catch (SQLException e) {
logger.error("close prepared statement error : {}",e.getMessage(),e);
}
}
if (connection != null){
try {
connection.close();
} catch (SQLException e) {
logger.error("close connection error : {}",e.getMessage(),e);
}
}
}
/**
* create connection
*
* @return connection
* @throws Exception Exception
*/
private static Connection createConnection(BaseDataSource baseDataSource) throws Exception{
// if hive , load connection params if exists
Connection connection = null;
if (HIVE == DbType.valueOf(baseDataSource.getDbType())) {
Properties paramProp = new Properties();
paramProp.setProperty(USER, baseDataSource.getUser());
paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
Map<String, String> connParamMap = CollectionUtils.stringToMap("",
SEMICOLON,
HIVE_CONF);
paramProp.putAll(connParamMap);
connection = DriverManager.getConnection(baseDataSource.getJdbcUrlDirect(),
paramProp);
}else{
connection = DriverManager.getConnection(baseDataSource.getJdbcUrlDirect(),
baseDataSource.getUser(),
baseDataSource.getPassword());
}
return connection;
}
/**
* pre sql
*
* @param connection connection
* @param preStatementsBinds preStatementsBinds
*/
private static void preSql(Connection connection,
List<SqlBinds> preStatementsBinds) throws Exception{
for (SqlBinds sqlBind: preStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
int result = pstmt.executeUpdate();
logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
}
}
}
/**
* preparedStatement bind
* @param connection connection
* @param sqlBinds sqlBinds
* @return PreparedStatement
* @throws Exception Exception
*/
private static PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
// is the timeout set
//boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED ||
// TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
//if(timeoutFlag){
// stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
//}
Map<Integer, Property> params = sqlBinds.getParamsMap();
if(params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
}
}
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
}
/**
* result process
*
* @param resultSet resultSet
* @throws Exception Exception
*/
private static void resultProcess(ResultSet resultSet) throws Exception{
ArrayNode resultJSONArray = JSONUtils.createArrayNode();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
int rowCount = 0;
while (rowCount < LIMIT && resultSet.next()) {
ObjectNode mapOfColValues = JSONUtils.createObjectNode();
for (int i = 1; i <= num; i++) {
mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
}
resultJSONArray.add(mapOfColValues);
rowCount++;
}
String result = JSONUtils.toJsonString(resultJSONArray);
logger.debug("execute sql : {}", result);
//sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle()) ?
// sqlParameters.getTitle(): taskExecutionContext.getTaskName() + " query result sets",
// JSONUtils.toJsonString(resultJSONArray));
}
/**
* post sql
*
* @param connection connection
* @param postStatementsBinds postStatementsBinds
* @throws Exception
*/
private static void postSql(Connection connection,
List<SqlBinds> postStatementsBinds) throws Exception{
for (SqlBinds sqlBind: postStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
int result = pstmt.executeUpdate();
logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
}
}
}
/**
* ready to execute SQL and parameter entity Map
* @return SqlBinds
*/
public static SqlBinds getSqlAndSqlParamsMap(String sql) {
Map<Integer,Property> sqlParamsMap = new HashMap<>();
StringBuilder sqlBuilder = new StringBuilder();
// find process instance by task id
/*
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sqlParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
*/
Map<String, Property> paramsMap = new HashMap<String, Property>();
// spell SQL according to the final user-defined variable
if(paramsMap == null){
sqlBuilder.append(sql);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
if (StringUtils.isNotEmpty("")){
String title = ParameterUtils.convertParameterPlaceholders("",
ParamUtils.convert(paramsMap));
logger.info("SQL title : {}",title);
//sqlParameters.setTitle(title);
}
//new
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, new Date());
// special characters need to be escaped, ${} needs to be escaped
String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
// replace the ${} of the SQL statement with the Placeholder
String formatSql = sql.replaceAll(rgex, "?");
sqlBuilder.append(formatSql);
// print repalce sql
printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
/**
* regular expressions match the contents between two specified strings
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*/
public static void setSqlParamsMap(String content, String rgex, Map<Integer,Property> sqlParamsMap, Map<String,Property> paramsPropsMap){
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
while (m.find()) {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
sqlParamsMap.put(index,prop);
index ++;
}
}
/**
* print replace sql
* @param content content
* @param formatSql format sql
* @param rgex rgex
* @param sqlParamsMap sql params map
*/
public static void printReplacedSql(String content, String formatSql,String rgex, Map<Integer,Property> sqlParamsMap){
//parameter print style
logger.info("after replace sql , preparing : {}" , formatSql);
StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
for(int i=1;i<=sqlParamsMap.size();i++){
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
}
logger.info("Sql Params are {}", logPrint);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.server.utils;
import org.apache.commons.collections.MapUtils;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.utils.HadoopUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.entity.UdfFunc;
import org.slf4j.Logger;
import java.text.MessageFormat;
import java.util.*;
import java.util.stream.Collectors;
import static com.jz.dmp.cmdexectool.scheduler.common.utils.CollectionUtils.isNotEmpty;
/**
* udf utils
*/
public class UDFUtils {
/**
* create function format
*/
private static final String CREATE_FUNCTION_FORMAT = "create temporary function {0} as ''{1}''";
/**
* create function list
* @param udfFuncTenantCodeMap key is udf function,value is tenant code
* @param logger logger
* @return create function list
*/
public static List<String> createFuncs(Map<UdfFunc,String> udfFuncTenantCodeMap, Logger logger){
if (MapUtils.isEmpty(udfFuncTenantCodeMap)){
logger.info("can't find udf function resource");
return null;
}
List<String> funcList = new ArrayList<>();
// build jar sql
buildJarSql(funcList, udfFuncTenantCodeMap);
// build temp function sql
buildTempFuncSql(funcList, udfFuncTenantCodeMap.keySet().stream().collect(Collectors.toList()));
return funcList;
}
/**
* build jar sql
* @param sqls sql list
* @param udfFuncTenantCodeMap key is udf function,value is tenant code
*/
private static void buildJarSql(List<String> sqls, Map<UdfFunc,String> udfFuncTenantCodeMap) {
String defaultFS = HadoopUtils.getInstance().getConfiguration().get(Constants.FS_DEFAULTFS);
Set<Map.Entry<UdfFunc,String>> entries = udfFuncTenantCodeMap.entrySet();
for (Map.Entry<UdfFunc,String> entry:entries){
String uploadPath = HadoopUtils.getHdfsUdfDir(entry.getValue());
if (!uploadPath.startsWith("hdfs:")) {
uploadPath = defaultFS + uploadPath;
}
sqls.add(String.format("add jar %s%s", uploadPath, entry.getKey().getResourceName()));
}
}
/**
* build temp function sql
* @param sqls sql list
* @param udfFuncs udf function list
*/
private static void buildTempFuncSql(List<String> sqls, List<UdfFunc> udfFuncs) {
if (isNotEmpty(udfFuncs)) {
for (UdfFunc udfFunc : udfFuncs) {
sqls.add(MessageFormat
.format(CREATE_FUNCTION_FORMAT, udfFunc.getFuncName(), udfFunc.getClassName()));
}
}
}
}
...@@ -23,13 +23,31 @@ import java.nio.file.StandardOpenOption; ...@@ -23,13 +23,31 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.util.CollectionUtils;
import com.jz.dmp.cmdexectool.scheduler.common.Constants; import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlBinds;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.utils.DatabaseUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
...@@ -82,8 +100,53 @@ public class SqlTask extends AbstractTask { ...@@ -82,8 +100,53 @@ public class SqlTask extends AbstractTask {
@Override @Override
public void handle() throws Exception { public void handle() throws Exception {
try { try {
BaseDataSource baseDataSource = new BaseDataSource() {
@Override
public String driverClassSelector() {
// TODO Auto-generated method stub
return null;
}
@Override
public DbType dbTypeSelector() {
// TODO Auto-generated method stub
return null;
}
};
baseDataSource.setDbType(sqlParameters.getMyDbType().getDbType().name());
baseDataSource.setUser(sqlParameters.getUser());
baseDataSource.setPassword(sqlParameters.getPassword());
baseDataSource.setAddress(sqlParameters.getJdbcUrl());
List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
.orElse(new ArrayList<>())
.stream()
.map(DatabaseUtils::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(sqlParameters.getPosStatements())
.orElse(new ArrayList<>())
.stream()
.map(DatabaseUtils::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
//判断是否需要运行前置sql
if (!CollectionUtils.isEmpty(preStatementSqlBinds)) {
DatabaseUtils.executeUpdateSql(preStatementSqlBinds, baseDataSource);
}
// construct process // construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand()); CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
//判断是否运行后置sql
if (!CollectionUtils.isEmpty(postStatementSqlBinds)) {
DatabaseUtils.executeUpdateSql(postStatementSqlBinds, baseDataSource);
}
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
...@@ -94,6 +157,7 @@ public class SqlTask extends AbstractTask { ...@@ -94,6 +157,7 @@ public class SqlTask extends AbstractTask {
} }
} }
@Override @Override
public void cancelApplication(boolean cancelApplication) throws Exception { public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process // cancel process
......
...@@ -25,6 +25,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters; ...@@ -25,6 +25,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer; import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
...@@ -61,6 +62,9 @@ public class ProcessService { ...@@ -61,6 +62,9 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${spring.public-key}")
private String publicKey;
@Autowired @Autowired
private DmpDevelopTaskMapper dmpDevelopTaskMapper; private DmpDevelopTaskMapper dmpDevelopTaskMapper;
@Autowired @Autowired
...@@ -151,6 +155,7 @@ public class ProcessService { ...@@ -151,6 +155,7 @@ public class ProcessService {
param.put("projectId", dmpDevelopTaskDto.getProjectId()); param.put("projectId", dmpDevelopTaskDto.getProjectId());
List<DmpProjectConfigInfoDto> dtos = dmpProjectConfigInfoMapper.findList(param); List<DmpProjectConfigInfoDto> dtos = dmpProjectConfigInfoMapper.findList(param);
if (CollectionUtils.isEmpty(dtos)) { if (CollectionUtils.isEmpty(dtos)) {
logger.info("项目【{}】没有配置信息,请联系管理员", dmpDevelopTaskDto.getProjectId());
throw new RuntimeException("项目没有配置信息,请联系管理员"); throw new RuntimeException("项目没有配置信息,请联系管理员");
} }
...@@ -205,7 +210,7 @@ public class ProcessService { ...@@ -205,7 +210,7 @@ public class ProcessService {
break; break;
case sql: case sql:
SqlParameters sqlParameters = new SqlParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer); SqlParameters sqlParameters = new SqlParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer, publicKey);
sqlParameters.setTaskAppId(taskAppId); sqlParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(sqlParameters, projectConfigInfoDto); taskExecutionContext = new TaskExecutionContext(sqlParameters, projectConfigInfoDto);
......
...@@ -21,6 +21,7 @@ spring: ...@@ -21,6 +21,7 @@ spring:
maximumPoolSize: 20 maximumPoolSize: 20
connectionTimeout: 30000 connectionTimeout: 30000
idleTimeout: 600000 idleTimeout: 600000
public-key: rajZdV0xpCox+2vEHFLsKq2o2XVdMaQq
#日志打印 #日志打印
logging: logging:
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# resource storage type : HDFS,S3,NONE
resource.storage.type=NONE
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
resource.upload.path=/dolphinscheduler
# user data local directory path, please make sure the directory exists and have read write permissions
#data.basedir.path=/tmp/dolphinscheduler
# whether kerberos starts
hadoop.security.authentication.startup.state=false
# java.security.krb5.conf path
java.security.krb5.conf.path=/opt/krb5.conf
# login user from keytab username
login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path
login.user.keytab.path=/opt/hdfs.headless.keytab
#resource.view.suffixs
#resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties
# if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path
hdfs.root.user=hdfs
# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://mycluster:8020
# if resource.storage.type=S3,s3 endpoint
fs.s3a.endpoint=http://192.168.xx.xx:9010
# if resource.storage.type=S3,s3 access key
fs.s3a.access.key=A3DXS30FO22544RE
# if resource.storage.type=S3,s3 secret key
fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
# if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# if resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname.
yarn.application.status.address=http://ds1:8088/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
# system env path
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
development.state=false
# kerberos tgt expire time, unit is hours
kerberos.expire.time=2
# datasource encryption salt
datasource.encryption.enable=false
datasource.encryption.salt=!@#$%^&*
# Network IP gets priority, default inner outer
#dolphin.scheduler.network.priority.strategy=default
...@@ -16,10 +16,26 @@ ...@@ -16,10 +16,26 @@
*/ */
package com.jz.cmdexectool.test.task.shell; package com.jz.cmdexectool.test.task.shell;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.jz.dmp.cmdexectool.ApiApplication; import com.jz.dmp.cmdexectool.ApiApplication;
import com.jz.dmp.cmdexectool.scheduler.common.Constants; import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus; import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlBinds;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CollectionUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CommonUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.DataSourceFactory;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MySQLDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.utils.DatabaseUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.TaskManager; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.TaskManager;
import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService; import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
...@@ -27,7 +43,25 @@ import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService; ...@@ -27,7 +43,25 @@ import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
import java.io.File; import java.io.File;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -36,6 +70,9 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -36,6 +70,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.*;
import static com.jz.dmp.cmdexectool.scheduler.common.enums.DbType.HIVE;
/** /**
* python shell command executor test * python shell command executor test
*/ */
...@@ -48,7 +85,9 @@ public class SQLCommandExecutorTest { ...@@ -48,7 +85,9 @@ public class SQLCommandExecutorTest {
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Test private static final int LIMIT = 10000;
//@Test
public void test() { public void test() {
try { try {
TaskExecutionContext taskExecutionContext = processService.findTaskExecutionContextById(460,null,true); TaskExecutionContext taskExecutionContext = processService.findTaskExecutionContextById(460,null,true);
...@@ -78,4 +117,378 @@ public class SQLCommandExecutorTest { ...@@ -78,4 +117,378 @@ public class SQLCommandExecutorTest {
logger.info(e.getMessage()); logger.info(e.getMessage());
} }
} }
//@Test
public void test22() {
System.out.println("test");
}
@Test
public void testJdbcHandler() {
// set the name of the current thread
//String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, "test");
//Thread.currentThread().setName(threadLoggerInfoName);
try {
System.out.println("test-----------");
// load class
DataSourceFactory.loadClass(DbType.valueOf("MYSQL"));
MySQLDataSource mySQLDataSource = new MySQLDataSource();
mySQLDataSource.setAddress("192.168.1.140:3307");
mySQLDataSource.setUser("dmp");
mySQLDataSource.setPassword("Ioubuy123");
//String json = JSONObject.toJSONString(mySQLDataSource);
// get datasource
// BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf("MYSQL"), json);
BaseDataSource baseDataSource = mySQLDataSource;
baseDataSource.setDbType(DbType.MYSQL.name());
baseDataSource.setJdbcUrlDirect("jdbc:mysql://192.168.1.140:3307/dmp_web_new");
baseDataSource.setUser("dmp");
baseDataSource.setPassword("Ioubuy123");
// ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap("insert into test(id, name) values(1, 'test')");
List<String> preStatements = new ArrayList<String>();
preStatements.add("insert into test(id, name) values(1, 'test')");
preStatements.add("insert into test(id, name) values(2, 'test2')");
List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(preStatements)
.orElse(new ArrayList<>())
.stream()
.map(x ->{
return getSqlAndSqlParamsMap(x);
}).collect(Collectors.toList());
List<SqlBinds> postStatementSqlBinds = Optional.ofNullable(preStatements)
.orElse(new ArrayList<>())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
//List<String> createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncTenantCodeMap(),
// logger);
List<String> createFuncs = new ArrayList<String>();
// execute sql task
//DatabaseUtils.executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
DatabaseUtils.executeUpdateSql(preStatementSqlBinds, baseDataSource);
//setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
} catch (Exception e) {
//setExitStatusCode(Constants.EXIT_CODE_FAILURE);
logger.error("sql task error", e);
//throw e;
}
}
/**
* ready to execute SQL and parameter entity Map
* @return SqlBinds
*/
private SqlBinds getSqlAndSqlParamsMap(String sql) {
Map<Integer,Property> sqlParamsMap = new HashMap<>();
StringBuilder sqlBuilder = new StringBuilder();
// find process instance by task id
/*
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sqlParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
*/
Map<String, Property> paramsMap = new HashMap<String, Property>();
// spell SQL according to the final user-defined variable
if(paramsMap == null){
sqlBuilder.append(sql);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
if (StringUtils.isNotEmpty("")){
String title = ParameterUtils.convertParameterPlaceholders("",
ParamUtils.convert(paramsMap));
logger.info("SQL title : {}",title);
//sqlParameters.setTitle(title);
}
//new
//replace variable TIME with $[YYYYmmddd...] in sql when history run job and batch complement job
sql = ParameterUtils.replaceScheduleTime(sql, new Date());
// special characters need to be escaped, ${} needs to be escaped
String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap);
// replace the ${} of the SQL statement with the Placeholder
String formatSql = sql.replaceAll(rgex, "?");
sqlBuilder.append(formatSql);
// print repalce sql
printReplacedSql(sql, formatSql, rgex, sqlParamsMap);
return new SqlBinds(sqlBuilder.toString(), sqlParamsMap);
}
/**
* print replace sql
* @param content content
* @param formatSql format sql
* @param rgex rgex
* @param sqlParamsMap sql params map
*/
public void printReplacedSql(String content, String formatSql,String rgex, Map<Integer,Property> sqlParamsMap){
//parameter print style
logger.info("after replace sql , preparing : {}" , formatSql);
StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
for(int i=1;i<=sqlParamsMap.size();i++){
logPrint.append(sqlParamsMap.get(i).getValue()+"("+sqlParamsMap.get(i).getType()+")");
}
logger.info("Sql Params are {}", logPrint);
}
/**
* regular expressions match the contents between two specified strings
* @param content content
* @param rgex rgex
* @param sqlParamsMap sql params map
* @param paramsPropsMap params props map
*/
public void setSqlParamsMap(String content, String rgex, Map<Integer,Property> sqlParamsMap, Map<String,Property> paramsPropsMap){
Pattern pattern = Pattern.compile(rgex);
Matcher m = pattern.matcher(content);
int index = 1;
while (m.find()) {
String paramName = m.group(1);
Property prop = paramsPropsMap.get(paramName);
sqlParamsMap.put(index,prop);
index ++;
}
}
/**
* execute function and sql
* @param mainSqlBinds main sql binds
* @param preStatementsBinds pre statements binds
* @param postStatementsBinds post statements binds
* @param createFuncs create functions
*/
public void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs){
Connection connection = null;
PreparedStatement stmt = null;
ResultSet resultSet = null;
try {
// if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf();
// create connection
connection = createConnection();
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection,createFuncs);
}
// pre sql
preSql(connection,preStatementsBinds);
stmt = prepareStatementAndBind(connection, mainSqlBinds);
// decide whether to executeQuery or executeUpdate based on sqlType
if (1 == SqlType.QUERY.ordinal()) {
// query statements need to be convert to JsonArray and inserted into Alert to send
resultSet = stmt.executeQuery();
resultProcess(resultSet);
} else if (1 == SqlType.NON_QUERY.ordinal()) {
// non query statement
stmt.executeUpdate();
}
postSql(connection,postStatementsBinds);
} catch (Exception e) {
logger.error("execute sql error",e);
throw new RuntimeException("execute sql error");
} finally {
close(resultSet,stmt,connection);
}
}
/**
* close jdbc resource
*
* @param resultSet resultSet
* @param pstmt pstmt
* @param connection connection
*/
private void close(ResultSet resultSet,
PreparedStatement pstmt,
Connection connection){
if (resultSet != null){
try {
resultSet.close();
} catch (SQLException e) {
logger.error("close result set error : {}",e.getMessage(),e);
}
}
if (pstmt != null){
try {
pstmt.close();
} catch (SQLException e) {
logger.error("close prepared statement error : {}",e.getMessage(),e);
}
}
if (connection != null){
try {
connection.close();
} catch (SQLException e) {
logger.error("close connection error : {}",e.getMessage(),e);
}
}
}
/**
* post sql
*
* @param connection connection
* @param postStatementsBinds postStatementsBinds
* @throws Exception
*/
private void postSql(Connection connection,
List<SqlBinds> postStatementsBinds) throws Exception{
for (SqlBinds sqlBind: postStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
int result = pstmt.executeUpdate();
logger.info("post statement execute result: {},for sql: {}",result,sqlBind.getSql());
}
}
}
/**
* result process
*
* @param resultSet resultSet
* @throws Exception Exception
*/
private void resultProcess(ResultSet resultSet) throws Exception{
ArrayNode resultJSONArray = JSONUtils.createArrayNode();
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
int rowCount = 0;
while (rowCount < LIMIT && resultSet.next()) {
ObjectNode mapOfColValues = JSONUtils.createObjectNode();
for (int i = 1; i <= num; i++) {
mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
}
resultJSONArray.add(mapOfColValues);
rowCount++;
}
String result = JSONUtils.toJsonString(resultJSONArray);
logger.debug("execute sql : {}", result);
//sendAttachment(StringUtils.isNotEmpty(sqlParameters.getTitle()) ?
// sqlParameters.getTitle(): taskExecutionContext.getTaskName() + " query result sets",
// JSONUtils.toJsonString(resultJSONArray));
}
/**
* pre sql
*
* @param connection connection
* @param preStatementsBinds preStatementsBinds
*/
private void preSql(Connection connection,
List<SqlBinds> preStatementsBinds) throws Exception{
for (SqlBinds sqlBind: preStatementsBinds) {
try (PreparedStatement pstmt = prepareStatementAndBind(connection, sqlBind)){
int result = pstmt.executeUpdate();
logger.info("pre statement execute result: {}, for sql: {}",result,sqlBind.getSql());
}
}
}
/**
* preparedStatement bind
* @param connection connection
* @param sqlBinds sqlBinds
* @return PreparedStatement
* @throws Exception Exception
*/
private PreparedStatement prepareStatementAndBind(Connection connection, SqlBinds sqlBinds) throws Exception {
// is the timeout set
//boolean timeoutFlag = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED ||
// TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
PreparedStatement stmt = connection.prepareStatement(sqlBinds.getSql());
//if(timeoutFlag){
// stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
//}
Map<Integer, Property> params = sqlBinds.getParamsMap();
if(params != null) {
for (Map.Entry<Integer, Property> entry : params.entrySet()) {
Property prop = entry.getValue();
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
}
}
logger.info("prepare statement replace sql : {} ", stmt);
return stmt;
}
/**
* create temp function
*
* @param connection connection
* @param createFuncs createFuncs
* @throws Exception
*/
private void createTempFunction(Connection connection,
List<String> createFuncs) throws Exception{
try (Statement funcStmt = connection.createStatement()) {
for (String createFunc : createFuncs) {
logger.info("hive create function sql: {}", createFunc);
funcStmt.execute(createFunc);
}
}
}
/**
* create connection
*
* @return connection
* @throws Exception Exception
*/
private Connection createConnection() throws Exception{
// if hive , load connection params if exists
Connection connection = null;
String type = "MYSQL";
if (HIVE == DbType.valueOf(type)) {
Properties paramProp = new Properties();
paramProp.setProperty(USER, "");
paramProp.setProperty(PASSWORD, "");
Map<String, String> connParamMap = CollectionUtils.stringToMap("",
SEMICOLON,
HIVE_CONF);
paramProp.putAll(connParamMap);
connection = DriverManager.getConnection("",
paramProp);
}else{
connection = DriverManager.getConnection("jdbc:mysql://192.168.1.140:3307/dmp_web_new?characterEncoding=utf8&useSSL=false",
"dmp",
"Ioubuy123");
}
return connection;
}
} }
\ No newline at end of file
...@@ -48,6 +48,11 @@ public class ShellCommandExecutorTest { ...@@ -48,6 +48,11 @@ public class ShellCommandExecutorTest {
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@Test
public void test22() {
System.out.println("test");
}
//@Test //@Test
public void test2() { public void test2() {
try { try {
...@@ -66,7 +71,7 @@ public class ShellCommandExecutorTest { ...@@ -66,7 +71,7 @@ public class ShellCommandExecutorTest {
} }
} }
@Test //@Test
public void test() { public void test() {
try { try {
TaskExecutionContext taskExecutionContext = processService.findTaskExecutionContextById(473,null,true); TaskExecutionContext taskExecutionContext = processService.findTaskExecutionContextById(473,null,true);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment