Commit 810fd04c authored by mcb's avatar mcb

COMMIT

parent e6038085
...@@ -297,7 +297,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -297,7 +297,7 @@ public class SyncParameters extends AbstractParameters {
//jdbcModel.put("upperBound", ""); //jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig); source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
} }
//sink
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, String source_table_name) { public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, String source_table_name) {
String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句 String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句 String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句
...@@ -314,12 +314,12 @@ public class SyncParameters extends AbstractParameters { ...@@ -314,12 +314,12 @@ public class SyncParameters extends AbstractParameters {
targetBaseDataSource.setPassword(password); targetBaseDataSource.setPassword(password);
targetBaseDataSource.setMyDbType(myDbType); targetBaseDataSource.setMyDbType(myDbType);
//sink
Map jdbcSinkModel = new HashMap(); Map jdbcSinkModel = new HashMap();
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName()); jdbcSinkModel.put("driver", targetDatasource.getDriverClassName());
jdbcSinkModel.put("url", targetDatasource.getJdbcUrl()); jdbcSinkModel.put("url", targetDatasource.getJdbcUrl());
//# 存储模式,支持overwrite、append、update、ignore、error //存储模式,支持overwrite、append、update、ignore、error
jdbcSinkModel.put("save_mode", targetObj.get("targetInsertMergeOverwrite")); //在save_mode指定为update时配置,用于指定键冲突的更新语句模板
jdbcSinkModel.put("save_mode", targetObj.get("primaryKeyConflict"));
//当存储模式是 overwrite时,仅清除表中数据 //当存储模式是 overwrite时,仅清除表中数据
if (null != targetObj.get("primaryKeyConflict")) { //主键冲突 if (null != targetObj.get("primaryKeyConflict")) { //主键冲突
if ("overwrite".equals(targetObj.get("primaryKeyConflict"))) { if ("overwrite".equals(targetObj.get("primaryKeyConflict"))) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment