Commit 8b29c23c authored by mcb's avatar mcb

commit

parent 2d0ae231
...@@ -76,7 +76,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -76,7 +76,7 @@ public class SyncParameters extends AbstractParameters {
Map<String, Object> scripts = (Map<String, Object>) scriptJson.get("scripts"); Map<String, Object> scripts = (Map<String, Object>) scriptJson.get("scripts");
Map<String, Object> sourceObj = (Map<String, Object>) scripts.get("reader"); //来源数据 Map<String, Object> sourceObj = (Map<String, Object>) scripts.get("reader"); //来源数据
Map<String, Object> targetObj = (Map<String, Object>) scripts.get("writer"); //目标数据 Map<String, Object> targetObj = (Map<String, Object>) scripts.get("writer"); //目标数据
Map<String, Object> mappingObj = (Map<String, Object>) scripts.get("mappingRelation"); //字段映射关系 List<Map<String, Object>> mappingObj = (List<Map<String, Object>>) scripts.get("mappingRelation"); //字段映射关系
//evn //evn
Map<String, String> envModel = new HashMap<String, String>(); Map<String, String> envModel = new HashMap<String, String>();
envModel.put("sparkappname", "Waterdrop"); envModel.put("sparkappname", "Waterdrop");
...@@ -94,9 +94,9 @@ public class SyncParameters extends AbstractParameters { ...@@ -94,9 +94,9 @@ public class SyncParameters extends AbstractParameters {
Integer targetTypeId = dmpSyncingDatasource.getDatasourceType(); Integer targetTypeId = dmpSyncingDatasource.getDatasourceType();
//Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) //Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2)
//source if ((sourceTypeId == DatasouceTypeConstant.MySQL || sourceTypeId == DatasouceTypeConstant.Oracle || sourceTypeId == DatasouceTypeConstant.SqlServer || sourceTypeId == DatasouceTypeConstant.PostgreSQL
if (sourceTypeId == DatasouceTypeConstant.MySQL || sourceTypeId == DatasouceTypeConstant.Oracle || sourceTypeId == DatasouceTypeConstant.SqlServer || sourceTypeId == DatasouceTypeConstant.PostgreSQL || sourceTypeId == DatasouceTypeConstant.Informix || sourceTypeId == DatasouceTypeConstant.DB2)) {
|| sourceTypeId == DatasouceTypeConstant.Informix || sourceTypeId == DatasouceTypeConstant.DB2) { //source
for (String tableName : registerTableName) { for (String tableName : registerTableName) {
Map jdbcModel = new HashMap(); Map jdbcModel = new HashMap();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName()); jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
...@@ -112,9 +112,10 @@ public class SyncParameters extends AbstractParameters { ...@@ -112,9 +112,10 @@ public class SyncParameters extends AbstractParameters {
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig); source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
} }
} }
//sink //Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2)
if (targetTypeId == DatasouceTypeConstant.MySQL || targetTypeId == DatasouceTypeConstant.Oracle || targetTypeId == DatasouceTypeConstant.SqlServer || targetTypeId == DatasouceTypeConstant.PostgreSQL if ((targetTypeId == DatasouceTypeConstant.MySQL || targetTypeId == DatasouceTypeConstant.Oracle || targetTypeId == DatasouceTypeConstant.SqlServer || targetTypeId == DatasouceTypeConstant.PostgreSQL
|| targetTypeId == DatasouceTypeConstant.Informix || targetTypeId == DatasouceTypeConstant.DB2) { || targetTypeId == DatasouceTypeConstant.Informix || targetTypeId == DatasouceTypeConstant.DB2)) {
//sink
Map jdbcSinkModel = new HashMap(); Map jdbcSinkModel = new HashMap();
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName()); jdbcSinkModel.put("driver", targetDatasource.getDriverClassName());
jdbcSinkModel.put("url", targetDatasource.getJdbcUrl()); jdbcSinkModel.put("url", targetDatasource.getJdbcUrl());
...@@ -131,6 +132,19 @@ public class SyncParameters extends AbstractParameters { ...@@ -131,6 +132,19 @@ public class SyncParameters extends AbstractParameters {
jdbcSinkModel.put("password", targetDatasource.getPassword()); jdbcSinkModel.put("password", targetDatasource.getPassword());
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig); sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig);
} }
//transform
if (mappingObj.size() > 0 && null != mappingObj) {
for (Map<String, Object> item : mappingObj) {
Map<String, String> transformJson2Model = new HashMap<String, String>();
if (null != item.get("sourceField")) {
transformJson2Model.put("source_field", (String) item.get("sourceField"));
}
if (null != item.get("targetField")) {
transformJson2Model.put("target_field", (String) item.get("targetField"));
}
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_JSON2, transformJson2Model, freeMarkerConfig);
}
}
if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) { if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) {
//source //source
...@@ -169,11 +183,18 @@ public class SyncParameters extends AbstractParameters { ...@@ -169,11 +183,18 @@ public class SyncParameters extends AbstractParameters {
sftpModel.put("result_table_name", tableName); //spark生成的临时表名 sftpModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig); source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
} }
//sink
//transform
} }
} }
public void getTransForm() {
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return waterdropScript != null && !waterdropScript.isEmpty(); return waterdropScript != null && !waterdropScript.isEmpty();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment