Commit a6e69722 authored by mcb's avatar mcb

commit

parent 526f94fa
...@@ -91,7 +91,6 @@ public class SyncParameters extends AbstractParameters { ...@@ -91,7 +91,6 @@ public class SyncParameters extends AbstractParameters {
sink = ""; sink = "";
transform = ""; transform = "";
this.script = script; this.script = script;
//Map<String, Object> scriptObj = (Map<String, Object>) JSONObject.parse(script);
JSONObject scriptObj = JSONObject.parseObject(script); JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, Object> scripts = (Map<String, Object>) scriptObj.get("scripts"); Map<String, Object> scripts = (Map<String, Object>) scriptObj.get("scripts");
Map<String, Object> sourceObj = (Map<String, Object>) scripts.get("reader"); //来源数据 Map<String, Object> sourceObj = (Map<String, Object>) scripts.get("reader"); //来源数据
...@@ -102,29 +101,54 @@ public class SyncParameters extends AbstractParameters { ...@@ -102,29 +101,54 @@ public class SyncParameters extends AbstractParameters {
envModel.put("sparkappname", "Waterdrop"); envModel.put("sparkappname", "Waterdrop");
env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig); env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
//source information
Integer sourceId = Integer.valueOf((String) sourceObj.get("sourceDbId"));
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId);
Integer sourceTypeId = dmpSyncingDatasource.getDatasourceType();
String source_table_name = String.valueOf(sourceObj.get("registerTableName"));
String[] registerTableName = ((String) sourceObj.get("registerTableName")).split(",");
//target information //target information
Integer targetDbId = Integer.valueOf((String) targetObj.get("targetDbId")); Integer targetDbId = Integer.valueOf((String) targetObj.get("targetDbId"));
DmpSyncingDatasource targetDatasource = dmpSyncingDatasourceDao.queryById(targetDbId); DmpSyncingDatasource targetDatasource = dmpSyncingDatasourceDao.queryById(targetDbId);
Integer targetTypeId = dmpSyncingDatasource.getDatasourceType(); Integer targetTypeId = targetDatasource.getDatasourceType();
String targetTable = String.valueOf(targetObj.get("targetTable"));//目标表 String targetTable = String.valueOf(targetObj.get("targetTable"));//目标表
//产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) source //source information
if ((sourceTypeId == DatasouceTypeConstant.MySQL String sourceIds = (String) sourceObj.get("sourceDbId");
|| sourceTypeId == DatasouceTypeConstant.Oracle String[] sourceIdArr = sourceIds.split(",");
|| sourceTypeId == DatasouceTypeConstant.SqlServer //Integer sourceId = Integer.valueOf(sourceIdArr[0]);
|| sourceTypeId == DatasouceTypeConstant.PostgreSQL String source_table_name = String.valueOf(sourceObj.get("registerTableName")); //来源表
|| sourceTypeId == DatasouceTypeConstant.Informix String[] registerTableName = source_table_name.split(",");
|| sourceTypeId == DatasouceTypeConstant.DB2)) {
//source //分库分表
getJdbcSource(dmpSyncingDatasource, registerTableName, freeMarkerConfig, publicKey); //***** source ******
for (int i = 0; i < sourceIdArr.length; i++) {
String id = sourceIdArr[i];
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(Integer.valueOf(id));
Integer sourceTypeId = dmpSyncingDatasource.getDatasourceType();
//产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) source
if ((sourceTypeId == DatasouceTypeConstant.MySQL
|| sourceTypeId == DatasouceTypeConstant.Oracle
|| sourceTypeId == DatasouceTypeConstant.SqlServer
|| sourceTypeId == DatasouceTypeConstant.PostgreSQL
|| sourceTypeId == DatasouceTypeConstant.Informix
|| sourceTypeId == DatasouceTypeConstant.DB2)) {
//source
getJdbcSource(dmpSyncingDatasource, registerTableName, freeMarkerConfig, publicKey, i);
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
//source
getSourceHive(envModel, freeMarkerConfig, registerTableName, i);
}
if (sourceTypeId == DatasouceTypeConstant.Kudu) {
//source
}
if (sourceTypeId == DatasouceTypeConstant.SFTP) {
//source
getSourceSftp(registerTableName, dmpSyncingDatasource, publicKey, sourceObj, freeMarkerConfig, i);
}
if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) {
//source
getsourceElasticsearch(registerTableName, dmpSyncingDatasource, freeMarkerConfig, i);
}
} }
//产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) sink //产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) sink
if ((targetTypeId == DatasouceTypeConstant.MySQL if ((targetTypeId == DatasouceTypeConstant.MySQL
|| targetTypeId == DatasouceTypeConstant.Oracle || targetTypeId == DatasouceTypeConstant.Oracle
...@@ -132,7 +156,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -132,7 +156,7 @@ public class SyncParameters extends AbstractParameters {
|| targetTypeId == DatasouceTypeConstant.PostgreSQL || targetTypeId == DatasouceTypeConstant.PostgreSQL
|| targetTypeId == DatasouceTypeConstant.Informix || targetTypeId == DatasouceTypeConstant.Informix
|| targetTypeId == DatasouceTypeConstant.DB2)) { || targetTypeId == DatasouceTypeConstant.DB2)) {
getJdbcSink(targetDatasource, targetObj, freeMarkerConfig, publicKey); getJdbcSink(targetDatasource, targetObj, freeMarkerConfig, publicKey, source_table_name);
} }
//transform //transform
if (mappingObj.size() > 0 && null != mappingObj) { if (mappingObj.size() > 0 && null != mappingObj) {
...@@ -164,47 +188,6 @@ public class SyncParameters extends AbstractParameters { ...@@ -164,47 +188,6 @@ public class SyncParameters extends AbstractParameters {
} }
if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) {
//source
for (String tableName : registerTableName) {
List<String> list = new ArrayList<>();
String hosts = dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort();
list.add(hosts);
Map jdbcModel = new HashMap();
jdbcModel.put("hosts", list);
jdbcModel.put("result_table_name", tableName);
//jdbcModel.put("index", "");
//jdbcModel.put("name_age", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfig);
}
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
//source
}
if (sourceTypeId == DatasouceTypeConstant.Kudu) {
//source
}
if (sourceTypeId == DatasouceTypeConstant.SFTP) {
//source
for (String tableName : registerTableName) {
Map sftpModel = new HashMap();
sftpModel.put("host", dmpSyncingDatasource.getHost()); //主机名
sftpModel.put("user", dmpSyncingDatasource.getUserName()); //用户
sftpModel.put("password", EncryptionUtils.decode(targetDatasource.getPassword(), publicKey));
sftpModel.put("port", dmpSyncingDatasource.getPort()); //端口
if (null != sourceObj.get("fileType")) {
sftpModel.put("fileType", sourceObj.get("ftpFileType")); //文件类型
}
sftpModel.put("delimiter", sourceObj.get("sourceFtpCsvDelimiter")); //分隔符
sftpModel.put("head", "true"); //是否加载第一行
sftpModel.put("multiLine", "true"); //多行解析
if (null != sourceObj.get("sourceFtpDir")) {
sftpModel.put("path", sourceObj.get("sourceFtpDir").toString()); //文件路径
}
sftpModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
}
}
//waterdrop script //waterdrop script
Map<String, String> waterdropModel = new HashMap<String, String>(); Map<String, String> waterdropModel = new HashMap<String, String>();
waterdropModel.put("env", env); waterdropModel.put("env", env);
...@@ -214,24 +197,56 @@ public class SyncParameters extends AbstractParameters { ...@@ -214,24 +197,56 @@ public class SyncParameters extends AbstractParameters {
waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig); waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
} }
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurer freeMarkerConfig, String publicKey) { private void getsourceElasticsearch(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, FreeMarkerConfigurer freeMarkerConfig, int i) {
for (String tableName : registerTableName) { //source
Map jdbcModel = new HashMap(); String tableName = registerTableName[i];
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName()); //for (String tableName : registerTableName) {
jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl()); List<String> list = new ArrayList<>();
jdbcModel.put("table", tableName); String hosts = dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort();
jdbcModel.put("result_table_name", tableName); list.add(hosts);
jdbcModel.put("user", dmpSyncingDatasource.getUserName()); Map jdbcModel = new HashMap();
jdbcModel.put("password", EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey)); jdbcModel.put("hosts", list);
jdbcModel.put("partitionColumn", ""); jdbcModel.put("result_table_name", tableName);
jdbcModel.put("numPartitions", ""); //jdbcModel.put("index", "");
//jdbcModel.put("lowerBound", ""); //jdbcModel.put("name_age", "");
//jdbcModel.put("upperBound", ""); source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig); //}
} }
public void getSourceHive(Map<String, String> envModel, FreeMarkerConfigurer freeMarkerConfig, String[] registerTableName, int i) {
//evn
envModel.put("sparkSqlCatalogImplementation", "hive");
env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
//source
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map hiveModel = new HashMap();
hiveModel.put("pre_sql", " select * from " + tableName);
hiveModel.put("result_table_name", tableName);
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, hiveModel, freeMarkerConfig);
//}
} }
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurer freeMarkerConfig, String publicKey) { public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurer freeMarkerConfig, String publicKey, int i) {
//int tableLength =registerTableName.length-1;
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map jdbcModel = new HashMap();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl());
jdbcModel.put("table", tableName);
jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName());
jdbcModel.put("password", EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey));
//jdbcModel.put("partitionColumn", "");
//jdbcModel.put("numPartitions", "");
//jdbcModel.put("lowerBound", "");
//jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
//}
}
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurer freeMarkerConfig, String publicKey, String source_table_name) {
String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句 String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句 String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句
preStatements = new ArrayList<String>(); preStatements = new ArrayList<String>();
...@@ -264,10 +279,32 @@ public class SyncParameters extends AbstractParameters { ...@@ -264,10 +279,32 @@ public class SyncParameters extends AbstractParameters {
jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表 jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表
jdbcSinkModel.put("user", targetDatasource.getUserName()); jdbcSinkModel.put("user", targetDatasource.getUserName());
jdbcSinkModel.put("password", password); jdbcSinkModel.put("password", password);
jdbcSinkModel.put("source_table_name", targetObj.get("targetTable")); jdbcSinkModel.put("source_table_name", source_table_name);
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig); sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig);
} }
public void getSourceSftp(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, String publicKey, Map<String, Object> sourceObj, FreeMarkerConfigurer freeMarkerConfig, int i) {
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map sftpModel = new HashMap();
sftpModel.put("host", dmpSyncingDatasource.getHost()); //主机名
sftpModel.put("user", dmpSyncingDatasource.getUserName()); //用户
sftpModel.put("password", EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey));
sftpModel.put("port", dmpSyncingDatasource.getPort()); //端口
if (null != sourceObj.get("fileType")) {
sftpModel.put("fileType", sourceObj.get("ftpFileType")); //文件类型
}
sftpModel.put("delimiter", sourceObj.get("sourceFtpCsvDelimiter")); //分隔符
sftpModel.put("head", "true"); //是否加载第一行
sftpModel.put("multiLine", "true"); //多行解析
if (null != sourceObj.get("sourceFtpDir")) {
sftpModel.put("path", sourceObj.get("sourceFtpDir").toString()); //文件路径
}
sftpModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
//}
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return waterdropScript != null && !waterdropScript.isEmpty(); return waterdropScript != null && !waterdropScript.isEmpty();
......
...@@ -9,4 +9,5 @@ ...@@ -9,4 +9,5 @@
</#if> </#if>
<#if sparkexecutormemory??> <#if sparkexecutormemory??>
spark.executor.memory = "${sparkexecutormemory!}" spark.executor.memory = "${sparkexecutormemory!}"
</#if> </#if>
\ No newline at end of file
spark {
<#if catalogImplementation??>
spark.sql.catalogImplementation = "${catalogImplementation!}"
</#if>
}
hive { hive {
<#if pre_sql??> <#if pre_sql??>
......
...@@ -38,4 +38,5 @@ jdbc { ...@@ -38,4 +38,5 @@ jdbc {
<#if upperBound??> <#if upperBound??>
jdbc.upperBound = ${upperBound!} jdbc.upperBound = ${upperBound!}
</#if> </#if>
} }
\ No newline at end of file
...@@ -10,4 +10,5 @@ sql { ...@@ -10,4 +10,5 @@ sql {
# 表名,可以配置 # 表名,可以配置
table_name = "${table_name!}" table_name = "${table_name!}"
</#if> </#if>
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment