Commit eda389b1 authored by mcb's avatar mcb

COMMIT

parent 3e16f307
...@@ -11,6 +11,7 @@ import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao; ...@@ -11,6 +11,7 @@ import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType; import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.StringUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.StringUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource; import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean; import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
...@@ -158,7 +159,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -158,7 +159,7 @@ public class SyncParameters extends AbstractParameters {
|| targetTypeId == DatasouceTypeConstant.PostgreSQL || targetTypeId == DatasouceTypeConstant.PostgreSQL
|| targetTypeId == DatasouceTypeConstant.Informix || targetTypeId == DatasouceTypeConstant.Informix
|| targetTypeId == DatasouceTypeConstant.DB2)) { || targetTypeId == DatasouceTypeConstant.DB2)) {
getJdbcSink(targetDatasource, targetObj, freeMarkerConfig, publicKey, source_table_name); getJdbcSink(targetDatasource, targetObj, freeMarkerConfig, publicKey, source_table_name, mappingObj);
} }
if (targetTypeId == DatasouceTypeConstant.SFTP) { if (targetTypeId == DatasouceTypeConstant.SFTP) {
getSinkSftp(targetDatasource, publicKey, targetObj, freeMarkerConfig, source_table_name); getSinkSftp(targetDatasource, publicKey, targetObj, freeMarkerConfig, source_table_name);
...@@ -167,8 +168,8 @@ public class SyncParameters extends AbstractParameters { ...@@ -167,8 +168,8 @@ public class SyncParameters extends AbstractParameters {
getSinkKudu(targetDatasource, freeMarkerConfig, source_table_name); getSinkKudu(targetDatasource, freeMarkerConfig, source_table_name);
} }
if (targetTypeId == DatasouceTypeConstant.Hive) { if (targetTypeId == DatasouceTypeConstant.Hive) {
//source //sink
getSinkHive(freeMarkerConfig,sourceObj, source_table_name,envModel ); getSinkHive(freeMarkerConfig, sourceObj, source_table_name, envModel);
} }
//transform //transform
if (mappingObj.size() > 0 && null != mappingObj) { if (mappingObj.size() > 0 && null != mappingObj) {
...@@ -184,7 +185,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -184,7 +185,7 @@ public class SyncParameters extends AbstractParameters {
waterdropScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig); waterdropScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
} }
private void getSinkHive(FreeMarkerConfigurationFactoryBean freeMarkerConfig, Map<String, Object> sourceObj, String source_table_name,Map<String, String> envModel ) { private void getSinkHive(FreeMarkerConfigurationFactoryBean freeMarkerConfig, Map<String, Object> sourceObj, String source_table_name, Map<String, String> envModel) {
Map hiveModel = new HashMap(); Map hiveModel = new HashMap();
hiveModel.put("pre_sql", " select * from " + source_table_name); hiveModel.put("pre_sql", " select * from " + source_table_name);
hiveModel.put("result_table_name", source_table_name); hiveModel.put("result_table_name", source_table_name);
...@@ -205,7 +206,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -205,7 +206,7 @@ public class SyncParameters extends AbstractParameters {
env = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig); env = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
} }
private void getSinkKudu(DmpSyncingDatasource targetDatasource,FreeMarkerConfigurationFactoryBean freeMarkerConfig, String source_table_name) { private void getSinkKudu(DmpSyncingDatasource targetDatasource, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String source_table_name) {
Map kuduModel = new HashMap(); Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", targetDatasource.getHost() + ":" + targetDatasource.getPort()); //主机名 kuduModel.put("kuduMaster", targetDatasource.getHost() + ":" + targetDatasource.getPort()); //主机名
kuduModel.put("result_table_name", source_table_name); //spark生成的临时表名 kuduModel.put("result_table_name", source_table_name); //spark生成的临时表名
...@@ -220,7 +221,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -220,7 +221,7 @@ public class SyncParameters extends AbstractParameters {
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KUDU, kuduModel, freeMarkerConfig); source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KUDU, kuduModel, freeMarkerConfig);
} }
private void getSinkSftp(DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig,String source_table_name) { private void getSinkSftp(DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String source_table_name) {
Map sftpModel = new HashMap(); Map sftpModel = new HashMap();
sftpModel.put("host", targetDatasource.getHost()); //主机名 sftpModel.put("host", targetDatasource.getHost()); //主机名
sftpModel.put("user", targetDatasource.getUserName()); //用户 sftpModel.put("user", targetDatasource.getUserName()); //用户
...@@ -324,9 +325,10 @@ public class SyncParameters extends AbstractParameters { ...@@ -324,9 +325,10 @@ public class SyncParameters extends AbstractParameters {
} }
//sink //sink
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, String source_table_name) { public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, String source_table_name, List<Map<String, Object>> mappingObj) {
String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句 String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句 String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句
String targetTable = String.valueOf(targetObj.get("targetTable"));//目标表
preStatements = new ArrayList<String>(); preStatements = new ArrayList<String>();
preStatements.add(preImportStatement); preStatements.add(preImportStatement);
posStatements = new ArrayList<String>(); posStatements = new ArrayList<String>();
...@@ -340,20 +342,43 @@ public class SyncParameters extends AbstractParameters { ...@@ -340,20 +342,43 @@ public class SyncParameters extends AbstractParameters {
targetBaseDataSource.setPassword(password); targetBaseDataSource.setPassword(password);
targetBaseDataSource.setMyDbType(myDbType); targetBaseDataSource.setMyDbType(myDbType);
String saveMode = "append";
String primaryKeyConflict = "";
if (StringUtils.isNotEmpty(String.valueOf(targetObj.get("primaryKeyConflict")))) {
primaryKeyConflict = String.valueOf(targetObj.get("primaryKeyConflict"));
}
if (myDbType.PostgreSQL == myDbType){
String pgSqlWriteMode = String.valueOf(targetObj.get("pgSqlWriteMode")); //pgsql 导入模式
if (StringUtils.isNotEmpty(pgSqlWriteMode)) {
saveMode = pgSqlWriteMode;
}
}
if (CommConstant.PRIMARY_KEY_CONFLICT_REPLACE.equals(primaryKeyConflict)
|| CommConstant.PRIMARY_KEY_CONFLICT_UPDATE.equals(primaryKeyConflict)) {
saveMode = "update";
}
Map jdbcSinkModel = new HashMap(); Map jdbcSinkModel = new HashMap();
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName());
jdbcSinkModel.put("url", targetDatasource.getJdbcUrl());
//存储模式,支持overwrite、append、update、ignore、error //存储模式,支持overwrite、append、update、ignore、error
//在save_mode指定为update时配置,用于指定键冲突的更新语句模板 //在save_mode指定为update时配置,用于指定键冲突的更新语句模板
jdbcSinkModel.put("save_mode", targetObj.get("primaryKeyConflict")); jdbcSinkModel.put("save_mode", saveMode);
//当存储模式是 overwrite时,仅清除表中数据 //当存储模式是 overwrite时,仅清除表中数据
if (null != targetObj.get("primaryKeyConflict")) { //主键冲突 if (StringUtils.isNotEmpty(primaryKeyConflict)) { //主键冲突
if ("overwrite".equals(targetObj.get("primaryKeyConflict"))) { if ("overwrite".equals(primaryKeyConflict)) {
jdbcSinkModel.put("truncate", "true"); jdbcSinkModel.put("truncate", "true");
} else { } else {
jdbcSinkModel.put("truncate", "false"); jdbcSinkModel.put("truncate", "false");
} }
} }
if ("update".equals(saveMode)) {
String customUpdateStmt = ParameterUtils.syncColumnMappingHandlerConflict(targetTable, mappingObj, myDbType, targetObj);
if (org.apache.commons.lang3.StringUtils.isNotEmpty(customUpdateStmt)) {
jdbcSinkModel.put("customUpdateStmt", customUpdateStmt);
}
}
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName());
jdbcSinkModel.put("url", targetDatasource.getJdbcUrl());
jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表 jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表
jdbcSinkModel.put("user", targetDatasource.getUserName()); jdbcSinkModel.put("user", targetDatasource.getUserName());
jdbcSinkModel.put("password", password); jdbcSinkModel.put("password", password);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment