Commit f3ac310c authored by sml's avatar sml
parents 87b43ea9 e6038085
package com.jz.dmp.cmdexectool.scheduler.common.task.sync;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.constant.DatasouceTypeConstant;
......@@ -18,7 +11,15 @@ import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.StringUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @ClassName: SyncParameters
......@@ -86,7 +87,7 @@ public class SyncParameters extends AbstractParameters {
*/
private MyBaseDataSource targetBaseDataSource;
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, String publicKey) throws Exception {
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey) throws Exception {
source = "";
env = "";
sink = "";
......@@ -100,7 +101,7 @@ public class SyncParameters extends AbstractParameters {
//evn
Map<String, String> envModel = new HashMap<String, String>();
envModel.put("sparkappname", "Waterdrop");
env = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfigurationFactoryBean);
env = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
//target information
Integer targetDbId = Integer.valueOf((String) targetObj.get("targetDbId"));
......@@ -130,26 +131,26 @@ public class SyncParameters extends AbstractParameters {
|| sourceTypeId == DatasouceTypeConstant.Informix
|| sourceTypeId == DatasouceTypeConstant.DB2)) {
//source
getJdbcSource(dmpSyncingDatasource, registerTableName, freeMarkerConfigurationFactoryBean, publicKey, i);
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
//source
getSourceHive(envModel, freeMarkerConfigurationFactoryBean, registerTableName, i);
getJdbcSource(dmpSyncingDatasource, registerTableName, freeMarkerConfig, publicKey, i, sourceObj);
}
if (sourceTypeId == DatasouceTypeConstant.Kudu) {
//source
getSourceKudu(dmpSyncingDatasource, registerTableName, freeMarkerConfig, i);
}
if (sourceTypeId == DatasouceTypeConstant.SFTP) {
//source
getSourceSftp(registerTableName, dmpSyncingDatasource, publicKey, sourceObj, freeMarkerConfigurationFactoryBean, i);
getSourceSftp(registerTableName, dmpSyncingDatasource, publicKey, sourceObj, freeMarkerConfig, i);
}
if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) {
//source
getsourceElasticsearch(registerTableName, dmpSyncingDatasource, freeMarkerConfigurationFactoryBean, i);
getsourceElasticsearch(registerTableName, dmpSyncingDatasource, freeMarkerConfig, i);
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
//source
getSourceHive(freeMarkerConfig, source_table_name,sourceObj);
}
}
//***** sink ******
//产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) sink
if ((targetTypeId == DatasouceTypeConstant.MySQL
|| targetTypeId == DatasouceTypeConstant.Oracle
......@@ -157,36 +158,17 @@ public class SyncParameters extends AbstractParameters {
|| targetTypeId == DatasouceTypeConstant.PostgreSQL
|| targetTypeId == DatasouceTypeConstant.Informix
|| targetTypeId == DatasouceTypeConstant.DB2)) {
getJdbcSink(targetDatasource, targetObj, freeMarkerConfigurationFactoryBean, publicKey, source_table_name);
getJdbcSink(targetDatasource, targetObj, freeMarkerConfig, publicKey, source_table_name);
}
if (targetTypeId == DatasouceTypeConstant.SFTP) {
getSinkSftp(targetTable, targetDatasource, publicKey, targetObj, freeMarkerConfig);
}
if (targetTypeId == DatasouceTypeConstant.Kudu) {
getSinkKudu(targetDatasource, targetTable, freeMarkerConfig);
}
//transform
if (mappingObj.size() > 0 && null != mappingObj) {
String sourceField = "";
String targetField = "";
for (Map<String, Object> item : mappingObj) {
sourceField += "," + item.get("sourceField");
targetField += "," + item.get("sourceField") + " as " + item.get("targetField");
}
Map sqlModel = new HashMap();
StringBuilder sql = new StringBuilder()
.append(" select ")
.append(" " + sourceField.substring(1) + " ")
.append(" from ")
.append(" $t ");
sqlModel.put("sql", sql);
sqlModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfigurationFactoryBean);
Map targetModel = new HashMap();
StringBuilder targetSql = new StringBuilder()
.append(" select ")
.append(" " + targetField.substring(1) + " ")
.append(" from ")
.append(" $t ");
targetModel.put("sql", targetSql);
targetModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfigurationFactoryBean);
getSyncTransform(mappingObj, freeMarkerConfig, source_table_name);
}
//waterdrop script
......@@ -195,13 +177,71 @@ public class SyncParameters extends AbstractParameters {
waterdropModel.put("source", source);
waterdropModel.put("transform", transform);
waterdropModel.put("sink", sink);
waterdropScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfigurationFactoryBean);
waterdropScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
}
private void getSinkKudu(DmpSyncingDatasource targetDatasource, String targetTable, FreeMarkerConfigurationFactoryBean freeMarkerConfig) {
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", targetDatasource.getHost() + ":" + targetDatasource.getPort()); //主机名
kuduModel.put("result_table_name", targetTable); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KUDU, kuduModel, freeMarkerConfig);
}
private void getSourceKudu(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurationFactoryBean freeMarkerConfig, int i) {
String tableName = registerTableName[i];
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort()); //主机名
kuduModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KUDU, kuduModel, freeMarkerConfig);
}
private void getsourceElasticsearch(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, int i) {
private void getSinkSftp(String targetTable, DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig) {
Map sftpModel = new HashMap();
sftpModel.put("host", targetDatasource.getHost()); //主机名
sftpModel.put("user", targetDatasource.getUserName()); //用户
sftpModel.put("password", EncryptionUtils.decode(targetDatasource.getPassword(), publicKey));
sftpModel.put("port", targetDatasource.getPort()); //端口
sftpModel.put("fileType", targetObj.get("ftpFileType")); //文件类型
sftpModel.put("delimiter", targetObj.get("sourceFtpCsvDelimiter")); //分隔符
sftpModel.put("multiLine", "true"); //多行解析
if (null != targetObj.get("sourceFtpDir")) {
sftpModel.put("path", targetObj.get("sourceFtpDir")); //文件路径
}
sftpModel.put("result_table_name", targetTable); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
}
private void getSyncTransform(List<Map<String, Object>> mappingObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String source_table_name) {
String sourceField = "";
String targetField = "";
for (Map<String, Object> item : mappingObj) {
sourceField += "," + item.get("sourceField");
targetField += "," + item.get("sourceField") + " as " + item.get("targetField");
}
Map sqlModel = new HashMap();
StringBuilder sql = new StringBuilder()
.append(" select ")
.append(" " + sourceField.substring(1) + " ")
.append(" from ")
.append(" $t ");
sqlModel.put("sql", sql);
sqlModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfig);
Map targetModel = new HashMap();
StringBuilder targetSql = new StringBuilder()
.append(" select ")
.append(" " + targetField.substring(1) + " ")
.append(" from ")
.append(" $t ");
targetModel.put("sql", targetSql);
targetModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
}
private void getsourceElasticsearch(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, FreeMarkerConfigurationFactoryBean freeMarkerConfig, int i) {
//source
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
List<String> list = new ArrayList<>();
String hosts = dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort();
list.add(hosts);
......@@ -210,28 +250,30 @@ public class SyncParameters extends AbstractParameters {
jdbcModel.put("result_table_name", tableName);
//jdbcModel.put("index", "");
//jdbcModel.put("name_age", "");
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfigurationFactoryBean);
//}
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfig);
}
public void getSourceHive(Map<String, String> envModel, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, String[] registerTableName, int i) {
//evn
envModel.put("sparkSqlCatalogImplementation", "hive");
env = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfigurationFactoryBean);
public void getSourceHive(FreeMarkerConfigurationFactoryBean freeMarkerConfig, String registerTableName, Map<String, Object> sourceObj) {
//source
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
String tableName = registerTableName;
Map hiveModel = new HashMap();
hiveModel.put("pre_sql", " select * from " + tableName);
hiveModel.put("result_table_name", tableName);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, hiveModel, freeMarkerConfigurationFactoryBean);
//}
String extractExpression = String.valueOf(sourceObj.get("extractExpression")); //数据过滤
if (StringUtils.isNotEmpty(extractExpression)) {
StringBuilder sql = new StringBuilder()
.append(" (select * ")
.append(" from ")
.append(tableName)
.append(" where ")
.append(extractExpression + " ) t");
hiveModel.put("pre_sql", sql);
}
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, hiveModel, freeMarkerConfig);
}
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, String publicKey, int i) {
//int tableLength =registerTableName.length-1;
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, int i, Map<String, Object> sourceObj) {
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map jdbcModel = new HashMap();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl());
......@@ -239,15 +281,24 @@ public class SyncParameters extends AbstractParameters {
jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName());
jdbcModel.put("password", EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey));
String extractExpression = String.valueOf(sourceObj.get("extractExpression")); //数据过滤
if (StringUtils.isNotEmpty(extractExpression)) {
StringBuilder sql = new StringBuilder()
.append(" (select * ")
.append(" from ")
.append(tableName)
.append(" where ")
.append(extractExpression + " ) t");
jdbcModel.put("table", sql);
}
//jdbcModel.put("partitionColumn", "");
//jdbcModel.put("numPartitions", "");
//jdbcModel.put("lowerBound", "");
//jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfigurationFactoryBean);
//}
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
}
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, String publicKey, String source_table_name) {
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, String source_table_name) {
String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句
preStatements = new ArrayList<String>();
......@@ -270,8 +321,8 @@ public class SyncParameters extends AbstractParameters {
//# 存储模式,支持overwrite、append、update、ignore、error
jdbcSinkModel.put("save_mode", targetObj.get("targetInsertMergeOverwrite"));
//当存储模式是 overwrite时,仅清除表中数据
if (null != targetObj.get("targetInsertMergeOverwrite")) {
if ("overwrite".equals(targetObj.get("targetInsertMergeOverwrite"))) {
if (null != targetObj.get("primaryKeyConflict")) { //主键冲突
if ("overwrite".equals(targetObj.get("primaryKeyConflict"))) {
jdbcSinkModel.put("truncate", "true");
} else {
jdbcSinkModel.put("truncate", "false");
......@@ -281,29 +332,30 @@ public class SyncParameters extends AbstractParameters {
jdbcSinkModel.put("user", targetDatasource.getUserName());
jdbcSinkModel.put("password", password);
jdbcSinkModel.put("source_table_name", source_table_name);
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfigurationFactoryBean);
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig);
}
public void getSourceSftp(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, String publicKey, Map<String, Object> sourceObj, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, int i) {
public void getSourceSftp(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, String publicKey, Map<String, Object> sourceObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, int i) {
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map sftpModel = new HashMap();
sftpModel.put("host", dmpSyncingDatasource.getHost()); //主机名
sftpModel.put("user", dmpSyncingDatasource.getUserName()); //用户
sftpModel.put("password", EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey));
sftpModel.put("port", dmpSyncingDatasource.getPort()); //端口
if (null != sourceObj.get("fileType")) {
sftpModel.put("fileType", sourceObj.get("ftpFileType")); //文件类型
}
sftpModel.put("fileType", sourceObj.get("ftpFileType")); //文件类型
sftpModel.put("delimiter", sourceObj.get("sourceFtpCsvDelimiter")); //分隔符
sftpModel.put("head", "true"); //是否加载第一行
if ("YES".equals(sourceObj.get("sourceFtpCsvDelimiter"))) { //是否包含表头
sftpModel.put("head", "true"); //是否加载第一行
} else {
sftpModel.put("head", "false");
}
sftpModel.put("multiLine", "true"); //多行解析
if (null != sourceObj.get("sourceFtpDir")) {
//支持多个路径,已数组传值
sftpModel.put("path", sourceObj.get("sourceFtpDir").toString()); //文件路径
}
sftpModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfigurationFactoryBean);
//}
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
}
@Override
......
kudu {
<#if kuduMaster??>
# kudu集群信息
kuduMaster = "${kuduMaster!}"
</#if>
<#if table_name??>
table = "${table_name!}"
</#if>
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment