Commit 6237d0f6 authored by mcb's avatar mcb

commit

parent 19e75675
......@@ -129,15 +129,15 @@ public class SyncParameters extends AbstractParameters {
|| sourceTypeId == DatasouceTypeConstant.Informix
|| sourceTypeId == DatasouceTypeConstant.DB2)) {
//source
getJdbcSource(dmpSyncingDatasource, registerTableName, freeMarkerConfig, publicKey, i);
getJdbcSource(dmpSyncingDatasource, registerTableName, freeMarkerConfig, publicKey, i, sourceObj);
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
//source
getSourceHive(envModel, freeMarkerConfig, registerTableName, i);
}
if (sourceTypeId == DatasouceTypeConstant.Kudu) {
//source
getSourceKudu(dmpSyncingDatasource, registerTableName, freeMarkerConfig, i);
}
if (sourceTypeId == DatasouceTypeConstant.SFTP) {
//source
......@@ -148,7 +148,7 @@ public class SyncParameters extends AbstractParameters {
getsourceElasticsearch(registerTableName, dmpSyncingDatasource, freeMarkerConfig, i);
}
}
//***** sink ******
//产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) sink
if ((targetTypeId == DatasouceTypeConstant.MySQL
|| targetTypeId == DatasouceTypeConstant.Oracle
......@@ -158,34 +158,15 @@ public class SyncParameters extends AbstractParameters {
|| targetTypeId == DatasouceTypeConstant.DB2)) {
getJdbcSink(targetDatasource, targetObj, freeMarkerConfig, publicKey, source_table_name);
}
if (targetTypeId == DatasouceTypeConstant.SFTP) {
getSinkSftp(targetTable, targetDatasource, publicKey, targetObj, freeMarkerConfig);
}
if (targetTypeId == DatasouceTypeConstant.Kudu) {
getSinkKudu(targetDatasource, targetTable, freeMarkerConfig);
}
//transform
if (mappingObj.size() > 0 && null != mappingObj) {
String sourceField = "";
String targetField = "";
for (Map<String, Object> item : mappingObj) {
sourceField += "," + item.get("sourceField");
targetField += "," + item.get("sourceField") + " as " + item.get("targetField");
}
Map sqlModel = new HashMap();
StringBuilder sql = new StringBuilder()
.append(" select ")
.append(" " + sourceField.substring(1) + " ")
.append(" from ")
.append(" $t ");
sqlModel.put("sql", sql);
sqlModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfig);
Map targetModel = new HashMap();
StringBuilder targetSql = new StringBuilder()
.append(" select ")
.append(" " + targetField.substring(1) + " ")
.append(" from ")
.append(" $t ");
targetModel.put("sql", targetSql);
targetModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
getSyncTransform(mappingObj, freeMarkerConfig, source_table_name);
}
//waterdrop script
......@@ -197,10 +178,68 @@ public class SyncParameters extends AbstractParameters {
waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
}
private void getSinkKudu(DmpSyncingDatasource targetDatasource, String targetTable, FreeMarkerConfigurer freeMarkerConfig) {
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", targetDatasource.getHost() + ":" + targetDatasource.getPort()); //主机名
kuduModel.put("result_table_name", targetTable); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, kuduModel, freeMarkerConfig);
}
private void getSourceKudu(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurer freeMarkerConfig, int i) {
String tableName = registerTableName[i];
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort()); //主机名
kuduModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, kuduModel, freeMarkerConfig);
}
private void getSinkSftp(String targetTable, DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurer freeMarkerConfig) {
Map sftpModel = new HashMap();
sftpModel.put("host", targetDatasource.getHost()); //主机名
sftpModel.put("user", targetDatasource.getUserName()); //用户
sftpModel.put("password", EncryptionUtils.decode(targetDatasource.getPassword(), publicKey));
sftpModel.put("port", targetDatasource.getPort()); //端口
sftpModel.put("fileType", targetObj.get("ftpFileType")); //文件类型
sftpModel.put("delimiter", targetObj.get("sourceFtpCsvDelimiter")); //分隔符
sftpModel.put("multiLine", "true"); //多行解析
if (null != targetObj.get("sourceFtpDir")) {
sftpModel.put("path", targetObj.get("sourceFtpDir")); //文件路径
}
sftpModel.put("result_table_name", targetTable); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
}
private void getSyncTransform(List<Map<String, Object>> mappingObj, FreeMarkerConfigurer freeMarkerConfig, String source_table_name) {
String sourceField = "";
String targetField = "";
for (Map<String, Object> item : mappingObj) {
sourceField += "," + item.get("sourceField");
targetField += "," + item.get("sourceField") + " as " + item.get("targetField");
}
Map sqlModel = new HashMap();
StringBuilder sql = new StringBuilder()
.append(" select ")
.append(" " + sourceField.substring(1) + " ")
.append(" from ")
.append(" $t ");
sqlModel.put("sql", sql);
sqlModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfig);
Map targetModel = new HashMap();
StringBuilder targetSql = new StringBuilder()
.append(" select ")
.append(" " + targetField.substring(1) + " ")
.append(" from ")
.append(" $t ");
targetModel.put("sql", targetSql);
targetModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
}
private void getsourceElasticsearch(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, FreeMarkerConfigurer freeMarkerConfig, int i) {
//source
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
List<String> list = new ArrayList<>();
String hosts = dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort();
list.add(hosts);
......@@ -210,25 +249,18 @@ public class SyncParameters extends AbstractParameters {
//jdbcModel.put("index", "");
//jdbcModel.put("name_age", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfig);
//}
}
public void getSourceHive(Map<String, String> envModel, FreeMarkerConfigurer freeMarkerConfig, String[] registerTableName, int i) {
//evn
envModel.put("sparkSqlCatalogImplementation", "hive");
env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
//source
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map hiveModel = new HashMap();
hiveModel.put("pre_sql", " select * from " + tableName);
hiveModel.put("result_table_name", tableName);
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, hiveModel, freeMarkerConfig);
//}
}
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurer freeMarkerConfig, String publicKey, int i) {
//int tableLength =registerTableName.length-1;
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurer freeMarkerConfig, String publicKey, int i, Map<String, Object> sourceObj) {
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map jdbcModel = new HashMap();
......@@ -238,6 +270,7 @@ public class SyncParameters extends AbstractParameters {
jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName());
jdbcModel.put("password", EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey));
//sourceObj.get("extractExpression"); //数据过滤
//jdbcModel.put("partitionColumn", "");
//jdbcModel.put("numPartitions", "");
//jdbcModel.put("lowerBound", "");
......@@ -291,13 +324,16 @@ public class SyncParameters extends AbstractParameters {
sftpModel.put("user", dmpSyncingDatasource.getUserName()); //用户
sftpModel.put("password", EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey));
sftpModel.put("port", dmpSyncingDatasource.getPort()); //端口
if (null != sourceObj.get("fileType")) {
sftpModel.put("fileType", sourceObj.get("ftpFileType")); //文件类型
}
sftpModel.put("fileType", sourceObj.get("ftpFileType")); //文件类型
sftpModel.put("delimiter", sourceObj.get("sourceFtpCsvDelimiter")); //分隔符
sftpModel.put("head", "true"); //是否加载第一行
if ("YES".equals(sourceObj.get("sourceFtpCsvDelimiter"))) { //是否包含表头
sftpModel.put("head", "true"); //是否加载第一行
} else {
sftpModel.put("head", "false");
}
sftpModel.put("multiLine", "true"); //多行解析
if (null != sourceObj.get("sourceFtpDir")) {
//支持多个路径,已数组传值
sftpModel.put("path", sourceObj.get("sourceFtpDir").toString()); //文件路径
}
sftpModel.put("result_table_name", tableName); //spark生成的临时表名
......
kudu {
<#if kuduMaster??>
# kudu集群信息
kuduMaster = "${kuduMaster!}"
</#if>
<#if table_name??>
table = "${table_name!}"
</#if>
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment