Commit e967006e authored by sml's avatar sml

代码提交

parent 02dd0b7c
...@@ -53,6 +53,7 @@ public class CommConstant { ...@@ -53,6 +53,7 @@ public class CommConstant {
public static final String WATERDROP_FTL_SOURCE_JDBC = "source_jdbc.ftl"; public static final String WATERDROP_FTL_SOURCE_JDBC = "source_jdbc.ftl";
public static final String WATERDROP_FTL_SOURCE_ELASTICSEARCH = "source_elasticsearch.ftl"; public static final String WATERDROP_FTL_SOURCE_ELASTICSEARCH = "source_elasticsearch.ftl";
public static final String WATERDROP_FTL_SOURCE_SFTP = "source_sftp.ftl"; public static final String WATERDROP_FTL_SOURCE_SFTP = "source_sftp.ftl";
public static final String WATERDROP_FTL_SOURCE_HIVE = "source_hive.ftl";
public static final String WATERDROP_FTL_TRANSFORM_SQL = "transform_sql.ftl"; public static final String WATERDROP_FTL_TRANSFORM_SQL = "transform_sql.ftl";
public static final String WATERDROP_FTL_TRANSFORM_JSON2 = "transform_json2.ftl"; public static final String WATERDROP_FTL_TRANSFORM_JSON2 = "transform_json2.ftl";
public static final String WATERDROP_FTL_SINK_CONSOLE = "sink_console.ftl"; public static final String WATERDROP_FTL_SINK_CONSOLE = "sink_console.ftl";
...@@ -67,4 +68,9 @@ public class CommConstant { ...@@ -67,4 +68,9 @@ public class CommConstant {
public static final String FTL_DOCTRANS = "doctrans.ftl";//文件转码 public static final String FTL_DOCTRANS = "doctrans.ftl";//文件转码
public static final String FTL_HDFS_UPLOAD = "hdfs_upload.ftl";//HDFS上传 public static final String FTL_HDFS_UPLOAD = "hdfs_upload.ftl";//HDFS上传
/***************************************************/
//执行引擎
public static final String EXECUTION_ENGINE_JDBC = "jdbc";//jdbc
public static final String EXECUTION_ENGINE_SPARK = "spark";//spark
} }
...@@ -21,8 +21,9 @@ import java.util.HashMap; ...@@ -21,8 +21,9 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer; import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant; import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.EncryptionUtils; import com.jz.dmp.cmdexectool.common.utils.EncryptionUtils;
...@@ -30,17 +31,19 @@ import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils; ...@@ -30,17 +31,19 @@ import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto; import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource; import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao; import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType; import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
/** /**
* Sql/Hql parameter * Sql/Hql parameter
*/ */
public class SqlParameters extends AbstractParameters { public class SqlParameters extends AbstractParameters {
private static Logger logger = LoggerFactory.getLogger(SqlParameters.class);
/** /**
* shell script * shell script
*/ */
...@@ -74,34 +77,34 @@ public class SqlParameters extends AbstractParameters { ...@@ -74,34 +77,34 @@ public class SqlParameters extends AbstractParameters {
private String waterdropScript; private String waterdropScript;
/** /**
* 前语句 * 前语句
*/ */
private List<String> preStatements; private List<String> preStatements;
/** /**
* 后语句 * 后语句
*/ */
private List<String> posStatements; private List<String> posStatements;
/** /**
* jdbcUrl * 执行引擎
*/ */
private String jdbcUrl; private String executioEngine;
/** /**
* jdbc user * sql执行语句
*/ */
private String user; private String sqlScript;
/** /**
* jdbc password * 源数据源
*/ */
private String password; private MyBaseDataSource sourceBaseDataSource;
/** /**
* 数据源类型 * 目标数据源
*/ */
private MyDbType myDbType; private MyBaseDataSource targetBaseDataSource;
/** /**
* resource list * resource list
...@@ -120,61 +123,173 @@ public class SqlParameters extends AbstractParameters { ...@@ -120,61 +123,173 @@ public class SqlParameters extends AbstractParameters {
String outputType = scriptObj.getString("outputType"); String outputType = scriptObj.getString("outputType");
String sqlScript = scriptObj.getString("sqlScript"); String sqlScript = scriptObj.getString("sqlScript");
//设置sql执行语句
//evn this.sqlScript = sqlScript;
Map<String, String> envModel = new HashMap<String, String>(); //设置执行引擎
envModel.put("sparkappname", "Waterdrop"); String executioEngine = scriptObj.getString("executioEngine");
env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig); this.executioEngine = executioEngine;
//产生evn模板
generatorEnvStr(freeMarkerConfig);
//source //source
Integer sourceId = scriptObj.getInteger("sourceId"); generatorSourceStr(dmpSyncingDatasourceDao, freeMarkerConfig, publicKey, scriptObj);
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId);
this.jdbcUrl = dmpSyncingDatasource.getJdbcUrl(); if (CommConstant.OUTPUT_TYPE_CONSOLE.equals(outputType)) {
this.user = dmpSyncingDatasource.getUserName(); //产生console transform and sink
this.password = EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey); geneConsoleTransfAndSink(freeMarkerConfig);
this.myDbType = MyDbType.obtainByIdStr(dmpSyncingDatasource.getId().toString()); }else if (CommConstant.OUTPUT_TYPE_HDFS.equals(outputType)) {
//产生hdfs transform and sink
geneHdfsTransfAndSink(freeMarkerConfig, scriptObj);
}else if (CommConstant.OUTPUT_TYPE_TABLE.equals(outputType)) {
//产生table transform and sink
geneTableTransfAndSink(dmpSyncingDatasourceDao, freeMarkerConfig, scriptObj, publicKey);
}else if (CommConstant.OUTPUT_TYPE_TOPIC.equals(outputType)) {
//产生topic transform and sink
geneTopicTransfAndSink(freeMarkerConfig, scriptObj);
}else if (CommConstant.OUTPUT_TYPE_API.equals(outputType)) {
//产生api transform and sink
geneApiTransfAndSink(freeMarkerConfig, scriptObj);
}
//waterdrop script
geneWaterdropStr(freeMarkerConfig);
String sourceTableNames = scriptObj.getString("sourceTableNames");
String[] tableNameArr = sourceTableNames.split(",");
for (String tableName : tableNameArr) {
Map<String, String> jdbcModel = new HashMap<String, String>();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", this.jdbcUrl);
jdbcModel.put("table", tableName);
jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", this.user);
jdbcModel.put("password", this.password);
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
} }
if (CommConstant.OUTPUT_TYPE_CONSOLE.equals(outputType)) { /**
* @Title: geneWaterdropStr
* @Description: TODO(waterdrop script)
* @param @param freeMarkerConfig 参数
* @return void 返回类型
* @throws
*/
private void geneWaterdropStr(FreeMarkerConfigurer freeMarkerConfig) {
Map<String, String> waterdropModel = new HashMap<String, String>();
waterdropModel.put("env", env);
waterdropModel.put("source", source);
waterdropModel.put("transform", transform);
waterdropModel.put("sink", sink);
this.waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
}
/**
* @Title: geneApiTransfAndSink
* @Description: TODO(产生api transform and sink)
* @param @param freeMarkerConfig
* @param @param scriptObj
* @param @param sqlScript 参数
* @return void 返回类型
* @throws
*/
private void geneApiTransfAndSink(FreeMarkerConfigurer freeMarkerConfig, JSONObject scriptObj) {
// 执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return;
}
//transform //transform
Map<String, String> transformSqlModel = new HashMap<String, String>(); Map<String, String> transformSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlScript); transformSqlModel.put("sql", this.sqlScript);
transformSqlModel.put("table_name", "t_view");
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig); transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
JSONObject apiObj = scriptObj.getJSONObject("api");
String columnFieldsObj = apiObj.getString("columnFields");
String sqlStr = ParameterUtils.columnMappingHandler(columnFieldsObj);
Map<String, String> transformMappingSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlStr);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_JSON2, transformMappingSqlModel, freeMarkerConfig);
//sink //sink
Map<String, String> stdoutModel = new HashMap<String, String>(); Map<String, String> sinkApiModel = new HashMap<String, String>();
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_CONSOLE, stdoutModel, freeMarkerConfig); sinkApiModel.put("url", apiObj.getString("apiUrl"));
}else if (CommConstant.OUTPUT_TYPE_HDFS.equals(outputType)) { sinkApiModel.put("apiKey", apiObj.getString("apiKey"));
sinkApiModel.put("method", apiObj.getString("method"));
sinkApiModel.put("signType", apiObj.getString("signType"));
sinkApiModel.put("authCode", apiObj.getString("authCode"));
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_API, sinkApiModel, freeMarkerConfig);
}
/**
* @Title: geneTopicTransfAndSink
* @Description: TODO(产生topic transform and sink)
* @param @param freeMarkerConfig
* @param @param scriptObj
* @param @param sqlScript 参数
* @return void 返回类型
* @throws
*/
private void geneTopicTransfAndSink(FreeMarkerConfigurer freeMarkerConfig, JSONObject scriptObj) {
// 执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return;
}
//transform //transform
Map<String, String> transformSqlModel = new HashMap<String, String>(); Map<String, String> transformSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlScript); transformSqlModel.put("sql", this.sqlScript);
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig); transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink //sink
JSONObject hdfsObj = scriptObj.getJSONObject("hdfs");
String hdfsDir = hdfsObj.getString("hdfsDir");
Map<String, String> hdfsModel = new HashMap<String, String>(); JSONObject topicObj = scriptObj.getJSONObject("topic");
hdfsModel.put("path", hdfsDir);
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_HDFS, hdfsModel, freeMarkerConfig); Map<String, String> kafkaModel = new HashMap<String, String>();
}else if (CommConstant.OUTPUT_TYPE_TABLE.equals(outputType)) { kafkaModel.put("topic", topicObj.getString("topic"));
kafkaModel.put("broker", topicObj.getString("server"));
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_KAFKA, kafkaModel, freeMarkerConfig);
}
/**
* @Title: geneTableTransfAndSink
* @Description: TODO(产生table transform and sink)
* @param @param dmpSyncingDatasourceDao
* @param @param freeMarkerConfig
* @param @param scriptObj
* @param @param sqlScript 参数
* @return void 返回类型
* @throws
*/
private void geneTableTransfAndSink(DmpSyncingDatasourceDao dmpSyncingDatasourceDao,
FreeMarkerConfigurer freeMarkerConfig, JSONObject scriptObj, String publicKey) {
JSONObject tableObj = scriptObj.getJSONObject("table");
//设置前导、后导语句
String preImportStatement = tableObj.getString("preImportStatement");
String postImportStatement = tableObj.getString("postImportStatement");
preStatements = new ArrayList<String>();
preStatements.add(preImportStatement);
posStatements = new ArrayList<String>();
posStatements.add(postImportStatement);
//设置目标执行前导后导语句目标数据源
Integer targetSourceId = tableObj.getInteger("targetSourceId");
DmpSyncingDatasource targetSource = dmpSyncingDatasourceDao.queryById(targetSourceId);
String jdbcUrl = targetSource.getJdbcUrl();
String user = targetSource.getUserName();
String password = EncryptionUtils.decode(targetSource.getPassword(), publicKey);
MyDbType myDbType = MyDbType.obtainByIdStr(targetSource.getId().toString());
targetBaseDataSource = new MyBaseDataSource();
targetBaseDataSource.setJdbcUrlDirect(jdbcUrl);
targetBaseDataSource.setUser(user);
targetBaseDataSource.setPassword(password);
targetBaseDataSource.setMyDbType(myDbType);
// 执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return;
}
//transform //transform
Map<String, String> transformSqlModel = new HashMap<String, String>(); Map<String, String> transformSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlScript); transformSqlModel.put("sql", this.sqlScript);
transformSqlModel.put("table_name", "t_view"); transformSqlModel.put("table_name", "t_view");
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig); transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
JSONObject tableObj = scriptObj.getJSONObject("table");
String tableFieldsObj = tableObj.getString("tableFields"); String tableFieldsObj = tableObj.getString("tableFields");
String sqlStr = ParameterUtils.columnMappingHandler(tableFieldsObj); String sqlStr = ParameterUtils.columnMappingHandler(tableFieldsObj);
...@@ -185,8 +300,12 @@ public class SqlParameters extends AbstractParameters { ...@@ -185,8 +300,12 @@ public class SqlParameters extends AbstractParameters {
//sink //sink
//targetSource //targetSource
Integer targetSourceId = tableObj.getInteger("targetSourceId"); if (this.targetBaseDataSource.getMyDbType() == MyDbType.MySQL
DmpSyncingDatasource targetSource = dmpSyncingDatasourceDao.queryById(targetSourceId); || this.targetBaseDataSource.getMyDbType() == MyDbType.SQLServer
|| this.targetBaseDataSource.getMyDbType() == MyDbType.PostgreSQL
|| this.targetBaseDataSource.getMyDbType() == MyDbType.Oracle
|| this.targetBaseDataSource.getMyDbType() == MyDbType.DB2
|| this.targetBaseDataSource.getMyDbType() == MyDbType.INFORMIX) {
Map<String, String> sinkJdbcModel = new HashMap<String, String>(); Map<String, String> sinkJdbcModel = new HashMap<String, String>();
sinkJdbcModel.put("save_mode", "overwrite"); sinkJdbcModel.put("save_mode", "overwrite");
...@@ -197,57 +316,137 @@ public class SqlParameters extends AbstractParameters { ...@@ -197,57 +316,137 @@ public class SqlParameters extends AbstractParameters {
sinkJdbcModel.put("password", targetSource.getPassword()); sinkJdbcModel.put("password", targetSource.getPassword());
sinkJdbcModel.put("dbtable", targetSource.getDbName()); sinkJdbcModel.put("dbtable", targetSource.getDbName());
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, sinkJdbcModel, freeMarkerConfig); sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, sinkJdbcModel, freeMarkerConfig);
}
}
/**
* @Title: geneHdfsTransfAndSink
* @Description: TODO(产生hdfs transform and sink)
* @param @param freeMarkerConfig
* @param @param scriptObj
* @param @param sqlScript 参数
* @return void 返回类型
* @throws
*/
private void geneHdfsTransfAndSink(FreeMarkerConfigurer freeMarkerConfig, JSONObject scriptObj) {
// 执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return ;
}
}else if (CommConstant.OUTPUT_TYPE_TOPIC.equals(outputType)) {
//transform //transform
Map<String, String> transformSqlModel = new HashMap<String, String>(); Map<String, String> transformSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlScript); transformSqlModel.put("sql", this.sqlScript);
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig); transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink //sink
JSONObject hdfsObj = scriptObj.getJSONObject("hdfs");
String hdfsDir = hdfsObj.getString("hdfsDir");
JSONObject topicObj = scriptObj.getJSONObject("topic"); Map<String, String> hdfsModel = new HashMap<String, String>();
hdfsModel.put("path", hdfsDir);
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_HDFS, hdfsModel, freeMarkerConfig);
}
Map<String, String> kafkaModel = new HashMap<String, String>(); /**
kafkaModel.put("topic", topicObj.getString("topic")); * @Title: geneConsoleTransfAndSink
kafkaModel.put("broker", topicObj.getString("server")); * @Description: TODO(产生console transform and sink)
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_KAFKA, kafkaModel, freeMarkerConfig); * @param @param freeMarkerConfig
}else if (CommConstant.OUTPUT_TYPE_API.equals(outputType)) { * @param @param sqlScript 参数
* @return void 返回类型
* @throws
*/
private void geneConsoleTransfAndSink(FreeMarkerConfigurer freeMarkerConfig) {
//执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return ;
}
//transform //transform
Map<String, String> transformSqlModel = new HashMap<String, String>(); Map<String, String> transformSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlScript); transformSqlModel.put("sql", this.sqlScript);
transformSqlModel.put("table_name", "t_view");
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig); transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink
Map<String, String> stdoutModel = new HashMap<String, String>();
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_CONSOLE, stdoutModel, freeMarkerConfig);
}
JSONObject apiObj = scriptObj.getJSONObject("api"); /**
String columnFieldsObj = apiObj.getString("columnFields"); * @Title: generatorSourceStr
String sqlStr = ParameterUtils.columnMappingHandler(columnFieldsObj); * @Description: TODO(生成source模板)
* @param @param dmpSyncingDatasourceDao
* @param @param freeMarkerConfig
* @param @param publicKey
* @param @param scriptObj 参数
* @return void 返回类型
* @throws
*/
private void generatorSourceStr(DmpSyncingDatasourceDao dmpSyncingDatasourceDao,
FreeMarkerConfigurer freeMarkerConfig, String publicKey, JSONObject scriptObj) {
Integer sourceId = scriptObj.getInteger("sourceId");
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId);
Map<String, String> transformMappingSqlModel = new HashMap<String, String>(); String jdbcUrl = dmpSyncingDatasource.getJdbcUrl();
transformSqlModel.put("sql", sqlStr); String user = dmpSyncingDatasource.getUserName();
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_JSON2, transformMappingSqlModel, freeMarkerConfig); String password = EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey);
MyDbType myDbType = MyDbType.obtainByIdStr(dmpSyncingDatasource.getId().toString());
//sink // 如果执行引擎选择的事jdbc,不用生成waterdrop source
Map<String, String> sinkApiModel = new HashMap<String, String>(); if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
sinkApiModel.put("url", apiObj.getString("apiUrl")); sourceBaseDataSource = new MyBaseDataSource();
sinkApiModel.put("apiKey", apiObj.getString("apiKey"));
sinkApiModel.put("method", apiObj.getString("method")); sourceBaseDataSource.setJdbcUrlDirect(jdbcUrl);
sinkApiModel.put("signType", apiObj.getString("signType")); sourceBaseDataSource.setUser(user);
sinkApiModel.put("authCode", apiObj.getString("authCode")); sourceBaseDataSource.setPassword(password);
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_API, sinkApiModel, freeMarkerConfig); sourceBaseDataSource.setMyDbType(myDbType);
return;
} }
//waterdrop script
Map<String, String> waterdropModel = new HashMap<String, String>();
waterdropModel.put("env", env);
waterdropModel.put("source", source);
waterdropModel.put("transform", transform);
waterdropModel.put("sink", sink);
this.waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
preStatements = new ArrayList<String>(); //jdbc
preStatements.add("insert into test(id, name) values(1, 'test')"); if (this.sourceBaseDataSource.getMyDbType() == MyDbType.MySQL
posStatements = new ArrayList<String>(); || this.sourceBaseDataSource.getMyDbType() == MyDbType.SQLServer
posStatements.add("insert into test(id, name) values(2, 'test2')"); || this.sourceBaseDataSource.getMyDbType() == MyDbType.PostgreSQL
|| this.sourceBaseDataSource.getMyDbType() == MyDbType.Oracle
|| this.sourceBaseDataSource.getMyDbType() == MyDbType.DB2
|| this.sourceBaseDataSource.getMyDbType() == MyDbType.INFORMIX) {
Map<String, String> jdbcModel = new HashMap<String, String>();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", jdbcUrl);
jdbcModel.put("table", "("+this.sqlScript+") as table_view");
jdbcModel.put("result_table_name", "table_view");
jdbcModel.put("user", user);
jdbcModel.put("password", password);
this.source = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel,
freeMarkerConfig);
}else if (this.sourceBaseDataSource.getMyDbType() == MyDbType.Hive) {
Map<String, String> hiveModel = new HashMap<String, String>();
hiveModel.put("catalogImplementation", "hive");
hiveModel.put("pre_sql", "select * from hive_db.hive_table");
hiveModel.put("result_table_name", "table_view");
this.source = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_HIVE, hiveModel,
freeMarkerConfig);
}else {
logger.info("waterdrow,不支持的数据源类型");
throw new RuntimeException("waterdrow,不支持的数据源");
}
}
/**
* @Title: generatorEnvStr
* @Description: TODO(产生env模板)
* @param @param freeMarkerConfig 参数
* @return void 返回类型
* @throws
*/
private void generatorEnvStr(FreeMarkerConfigurer freeMarkerConfig) {
//如果执行引擎选择的事jdbc,不用生成env
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return ;
}
Map<String, String> envModel = new HashMap<String, String>();
envModel.put("sparkappname", "Waterdrop");
this.env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
} }
public String getScript() { public String getScript() {
...@@ -322,44 +521,44 @@ public class SqlParameters extends AbstractParameters { ...@@ -322,44 +521,44 @@ public class SqlParameters extends AbstractParameters {
this.posStatements = posStatements; this.posStatements = posStatements;
} }
public List<ResourceInfo> getResourceList() { public String getExecutioEngine() {
return resourceList; return executioEngine;
} }
public void setResourceList(List<ResourceInfo> resourceList) { public void setExecutioEngine(String executioEngine) {
this.resourceList = resourceList; this.executioEngine = executioEngine;
} }
public String getJdbcUrl() { public String getSqlScript() {
return jdbcUrl; return sqlScript;
} }
public void setJdbcUrl(String jdbcUrl) { public void setSqlScript(String sqlScript) {
this.jdbcUrl = jdbcUrl; this.sqlScript = sqlScript;
} }
public String getUser() { public MyBaseDataSource getSourceBaseDataSource() {
return user; return sourceBaseDataSource;
} }
public void setUser(String user) { public void setSourceBaseDataSource(MyBaseDataSource sourceBaseDataSource) {
this.user = user; this.sourceBaseDataSource = sourceBaseDataSource;
} }
public String getPassword() { public MyBaseDataSource getTargetBaseDataSource() {
return password; return targetBaseDataSource;
} }
public void setPassword(String password) { public void setTargetBaseDataSource(MyBaseDataSource targetBaseDataSource) {
this.password = password; this.targetBaseDataSource = targetBaseDataSource;
} }
public MyDbType getMyDbType() { public List<ResourceInfo> getResourceList() {
return myDbType; return resourceList;
} }
public void setMyDbType(MyDbType myDbType) { public void setResourceList(List<ResourceInfo> resourceList) {
this.myDbType = myDbType; this.resourceList = resourceList;
} }
@Override @Override
......
...@@ -67,8 +67,6 @@ public abstract class BaseDataSource { ...@@ -67,8 +67,6 @@ public abstract class BaseDataSource {
*/ */
private String principal; private String principal;
private String dbType;
public String getPrincipal() { public String getPrincipal() {
return principal; return principal;
} }
...@@ -227,14 +225,6 @@ public abstract class BaseDataSource { ...@@ -227,14 +225,6 @@ public abstract class BaseDataSource {
this.other = other; this.other = other;
} }
public String getDbType() {
return dbType;
}
public void setDbType(String dbType) {
this.dbType = dbType;
}
public String getJdbcUrlDirect() { public String getJdbcUrlDirect() {
return jdbcUrlDirect; return jdbcUrlDirect;
} }
......
...@@ -36,6 +36,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.CommonUtils; ...@@ -36,6 +36,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.CommonUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource; import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils; import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils;
public class DatabaseUtils { public class DatabaseUtils {
...@@ -51,7 +52,7 @@ public class DatabaseUtils { ...@@ -51,7 +52,7 @@ public class DatabaseUtils {
* @param postStatementsBinds post statements binds * @param postStatementsBinds post statements binds
* @param createFuncs create functions * @param createFuncs create functions
*/ */
public static void executeUpdateSql(List<SqlBinds> statementsBinds, BaseDataSource baseDataSource){ public static void executeUpdateSql(List<SqlBinds> statementsBinds, MyBaseDataSource myBaseDataSource){
Connection connection = null; Connection connection = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet resultSet = null; ResultSet resultSet = null;
...@@ -59,7 +60,7 @@ public class DatabaseUtils { ...@@ -59,7 +60,7 @@ public class DatabaseUtils {
// if upload resource is HDFS and kerberos startup // if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf(); CommonUtils.loadKerberosConf();
// create connection // create connection
connection = createConnection(baseDataSource); connection = createConnection(myBaseDataSource);
// create temp function // create temp function
/* /*
if (CollectionUtils.isNotEmpty(createFuncs)) { if (CollectionUtils.isNotEmpty(createFuncs)) {
...@@ -89,7 +90,7 @@ public class DatabaseUtils { ...@@ -89,7 +90,7 @@ public class DatabaseUtils {
List<SqlBinds> preStatementsBinds, List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds, List<SqlBinds> postStatementsBinds,
List<String> createFuncs, List<String> createFuncs,
BaseDataSource baseDataSource){ MyBaseDataSource myBaseDataSource){
Connection connection = null; Connection connection = null;
PreparedStatement stmt = null; PreparedStatement stmt = null;
ResultSet resultSet = null; ResultSet resultSet = null;
...@@ -97,7 +98,7 @@ public class DatabaseUtils { ...@@ -97,7 +98,7 @@ public class DatabaseUtils {
// if upload resource is HDFS and kerberos startup // if upload resource is HDFS and kerberos startup
CommonUtils.loadKerberosConf(); CommonUtils.loadKerberosConf();
// create connection // create connection
connection = createConnection(baseDataSource); connection = createConnection(myBaseDataSource);
// create temp function // create temp function
/* /*
if (CollectionUtils.isNotEmpty(createFuncs)) { if (CollectionUtils.isNotEmpty(createFuncs)) {
...@@ -171,24 +172,24 @@ public class DatabaseUtils { ...@@ -171,24 +172,24 @@ public class DatabaseUtils {
* @return connection * @return connection
* @throws Exception Exception * @throws Exception Exception
*/ */
private static Connection createConnection(BaseDataSource baseDataSource) throws Exception{ private static Connection createConnection(MyBaseDataSource myBaseDataSource) throws Exception{
// if hive , load connection params if exists // if hive , load connection params if exists
Connection connection = null; Connection connection = null;
if (HIVE == DbType.valueOf(baseDataSource.getDbType())) { if (HIVE == DbType.valueOf(myBaseDataSource.getMyDbType().getDbType().name())) {
Properties paramProp = new Properties(); Properties paramProp = new Properties();
paramProp.setProperty(USER, baseDataSource.getUser()); paramProp.setProperty(USER, myBaseDataSource.getUser());
paramProp.setProperty(PASSWORD, baseDataSource.getPassword()); paramProp.setProperty(PASSWORD, myBaseDataSource.getPassword());
Map<String, String> connParamMap = CollectionUtils.stringToMap("", Map<String, String> connParamMap = CollectionUtils.stringToMap("",
SEMICOLON, SEMICOLON,
HIVE_CONF); HIVE_CONF);
paramProp.putAll(connParamMap); paramProp.putAll(connParamMap);
connection = DriverManager.getConnection(baseDataSource.getJdbcUrlDirect(), connection = DriverManager.getConnection(myBaseDataSource.getJdbcUrlDirect(),
paramProp); paramProp);
}else{ }else{
connection = DriverManager.getConnection(baseDataSource.getJdbcUrlDirect(), connection = DriverManager.getConnection(myBaseDataSource.getJdbcUrlDirect(),
baseDataSource.getUser(), myBaseDataSource.getUser(),
baseDataSource.getPassword()); myBaseDataSource.getPassword());
} }
return connection; return connection;
} }
......
...@@ -36,6 +36,7 @@ import org.apache.commons.lang3.StringUtils; ...@@ -36,6 +36,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.scheduler.common.Constants; import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType; import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property; import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
...@@ -45,6 +46,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters; ...@@ -45,6 +46,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource; import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.utils.DatabaseUtils; import com.jz.dmp.cmdexectool.scheduler.dao.utils.DatabaseUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils; import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils;
...@@ -101,25 +103,7 @@ public class SqlTask extends AbstractTask { ...@@ -101,25 +103,7 @@ public class SqlTask extends AbstractTask {
public void handle() throws Exception { public void handle() throws Exception {
try { try {
BaseDataSource baseDataSource = new BaseDataSource() { MyBaseDataSource targetBaseDataSource = sqlParameters.getTargetBaseDataSource();
@Override
public String driverClassSelector() {
// TODO Auto-generated method stub
return null;
}
@Override
public DbType dbTypeSelector() {
// TODO Auto-generated method stub
return null;
}
};
baseDataSource.setDbType(sqlParameters.getMyDbType().getDbType().name());
baseDataSource.setUser(sqlParameters.getUser());
baseDataSource.setPassword(sqlParameters.getPassword());
baseDataSource.setAddress(sqlParameters.getJdbcUrl());
List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()) List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
.orElse(new ArrayList<>()) .orElse(new ArrayList<>())
...@@ -136,20 +120,34 @@ public class SqlTask extends AbstractTask { ...@@ -136,20 +120,34 @@ public class SqlTask extends AbstractTask {
//判断是否需要运行前置sql //判断是否需要运行前置sql
if (!CollectionUtils.isEmpty(preStatementSqlBinds)) { if (!CollectionUtils.isEmpty(preStatementSqlBinds)) {
DatabaseUtils.executeUpdateSql(preStatementSqlBinds, baseDataSource); DatabaseUtils.executeUpdateSql(preStatementSqlBinds, targetBaseDataSource);
} }
if (sqlParameters.getExecutioEngine().equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
List<String> mainSqlScript = new ArrayList<String>();
mainSqlScript.add(sqlParameters.getSqlScript());
List<SqlBinds> mainStatementSqlBinds = Optional.ofNullable(mainSqlScript)
.orElse(new ArrayList<>())
.stream()
.map(DatabaseUtils::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
DatabaseUtils.executeUpdateSql(mainStatementSqlBinds, sqlParameters.getSourceBaseDataSource());
}else {
// construct process // construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand()); CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
}
//判断是否运行后置sql //判断是否运行后置sql
if (!CollectionUtils.isEmpty(postStatementSqlBinds)) { if (!CollectionUtils.isEmpty(postStatementSqlBinds)) {
DatabaseUtils.executeUpdateSql(postStatementSqlBinds, baseDataSource); DatabaseUtils.executeUpdateSql(postStatementSqlBinds, targetBaseDataSource);
} }
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) { } catch (Exception e) {
logger.error("sql task error", e); logger.error("sql task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE); setExitStatusCode(Constants.EXIT_CODE_FAILURE);
......
...@@ -23,6 +23,7 @@ import com.jz.dmp.cmdexectool.ApiApplication; ...@@ -23,6 +23,7 @@ import com.jz.dmp.cmdexectool.ApiApplication;
import com.jz.dmp.cmdexectool.scheduler.common.Constants; import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType; import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus; import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property; import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlBinds; import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlBinds;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlType; import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlType;
...@@ -32,6 +33,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils; ...@@ -32,6 +33,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource; import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.DataSourceFactory; import com.jz.dmp.cmdexectool.scheduler.dao.datasource.DataSourceFactory;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MySQLDataSource; import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MySQLDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.utils.DatabaseUtils; import com.jz.dmp.cmdexectool.scheduler.dao.utils.DatabaseUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
...@@ -134,21 +136,23 @@ public class SQLCommandExecutorTest { ...@@ -134,21 +136,23 @@ public class SQLCommandExecutorTest {
// load class // load class
DataSourceFactory.loadClass(DbType.valueOf("MYSQL")); DataSourceFactory.loadClass(DbType.valueOf("MYSQL"));
MySQLDataSource mySQLDataSource = new MySQLDataSource(); MyBaseDataSource myBaseDataSource = new MyBaseDataSource();
mySQLDataSource.setAddress("192.168.1.140:3307"); myBaseDataSource.setJdbcUrlDirect("jdbc:mysql://192.168.1.140:3307/dmp_web_new");
mySQLDataSource.setUser("dmp"); myBaseDataSource.setUser("dmp");
mySQLDataSource.setPassword("Ioubuy123"); myBaseDataSource.setPassword("Ioubuy123");
myBaseDataSource.setMyDbType(MyDbType.MySQL);
//String json = JSONObject.toJSONString(mySQLDataSource); //String json = JSONObject.toJSONString(mySQLDataSource);
// get datasource // get datasource
// BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf("MYSQL"), json); // BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf("MYSQL"), json);
BaseDataSource baseDataSource = mySQLDataSource; /*
baseDataSource.setDbType(DbType.MYSQL.name()); * BaseDataSource baseDataSource = mySQLDataSource;
baseDataSource.setJdbcUrlDirect("jdbc:mysql://192.168.1.140:3307/dmp_web_new"); * baseDataSource.s(DbType.MYSQL.name());
baseDataSource.setUser("dmp"); * baseDataSource.setJdbcUrlDirect("jdbc:mysql://192.168.1.140:3307/dmp_web_new"
baseDataSource.setPassword("Ioubuy123"); * ); baseDataSource.setUser("dmp"); baseDataSource.setPassword("Ioubuy123");
*/
// ready to execute SQL and parameter entity Map // ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap("insert into test(id, name) values(1, 'test')"); SqlBinds mainSqlBinds = getSqlAndSqlParamsMap("insert into test(id, name) values(1, 'test')");
...@@ -177,7 +181,7 @@ public class SQLCommandExecutorTest { ...@@ -177,7 +181,7 @@ public class SQLCommandExecutorTest {
// execute sql task // execute sql task
//DatabaseUtils.executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs); //DatabaseUtils.executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
DatabaseUtils.executeUpdateSql(preStatementSqlBinds, baseDataSource); DatabaseUtils.executeUpdateSql(preStatementSqlBinds, myBaseDataSource);
//setExitStatusCode(Constants.EXIT_CODE_SUCCESS); //setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
} catch (Exception e) { } catch (Exception e) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment