Commit 4e1937ae authored by mcb's avatar mcb

commit

parent ef06f173
...@@ -9,6 +9,7 @@ import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource; ...@@ -9,6 +9,7 @@ import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao; import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer; import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -62,6 +63,26 @@ public class SyncParameters extends AbstractParameters { ...@@ -62,6 +63,26 @@ public class SyncParameters extends AbstractParameters {
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
/**
* 前导语句
*/
private List<String> preStatements;
/**
* 后导语句
*/
private List<String> posStatements;
/**
* 源数据源
*/
private MyBaseDataSource sourceBaseDataSource;
/**
* 目标数据源
*/
private MyBaseDataSource targetBaseDataSource;
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) throws Exception { public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) throws Exception {
source = ""; source = "";
env = ""; env = "";
...@@ -84,53 +105,39 @@ public class SyncParameters extends AbstractParameters { ...@@ -84,53 +105,39 @@ public class SyncParameters extends AbstractParameters {
Integer sourceId = Integer.valueOf((String) sourceObj.get("sourceDbId")); Integer sourceId = Integer.valueOf((String) sourceObj.get("sourceDbId"));
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId); DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId);
Integer sourceTypeId = dmpSyncingDatasource.getDatasourceType(); Integer sourceTypeId = dmpSyncingDatasource.getDatasourceType();
String source_table_name = String.valueOf(sourceObj.get("registerTableName"));
String[] registerTableName = ((String) sourceObj.get("registerTableName")).split(","); String[] registerTableName = ((String) sourceObj.get("registerTableName")).split(",");
//target information //target information
Integer targetDbId = Integer.valueOf((String) targetObj.get("targetDbId")); Integer targetDbId = Integer.valueOf((String) targetObj.get("targetDbId"));
DmpSyncingDatasource targetDatasource = dmpSyncingDatasourceDao.queryById(targetDbId); DmpSyncingDatasource targetDatasource = dmpSyncingDatasourceDao.queryById(targetDbId);
Integer targetTypeId = dmpSyncingDatasource.getDatasourceType(); Integer targetTypeId = dmpSyncingDatasource.getDatasourceType();
String targetTable = String.valueOf(targetObj.get("targetTable"));//目标表
//Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
if ((sourceTypeId == DatasouceTypeConstant.MySQL || sourceTypeId == DatasouceTypeConstant.Oracle || sourceTypeId == DatasouceTypeConstant.SqlServer || sourceTypeId == DatasouceTypeConstant.PostgreSQL String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句
|| sourceTypeId == DatasouceTypeConstant.Informix || sourceTypeId == DatasouceTypeConstant.DB2)) { preStatements = new ArrayList<String>();
preStatements.add(preImportStatement);
posStatements = new ArrayList<String>();
posStatements.add(postImportStatement);
//产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) source
if ((sourceTypeId == DatasouceTypeConstant.MySQL
|| sourceTypeId == DatasouceTypeConstant.Oracle
|| sourceTypeId == DatasouceTypeConstant.SqlServer
|| sourceTypeId == DatasouceTypeConstant.PostgreSQL
|| sourceTypeId == DatasouceTypeConstant.Informix
|| sourceTypeId == DatasouceTypeConstant.DB2)) {
//source //source
for (String tableName : registerTableName) { getJdbcSource(dmpSyncingDatasource, registerTableName, freeMarkerConfig);
Map jdbcModel = new HashMap();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl());
jdbcModel.put("table", tableName);
jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName());
jdbcModel.put("password", dmpSyncingDatasource.getPassword());
jdbcModel.put("partitionColumn", "");
jdbcModel.put("numPartitions", "");
jdbcModel.put("lowerBound", "");
jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
}
} }
//Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) //产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) sink
if ((targetTypeId == DatasouceTypeConstant.MySQL || targetTypeId == DatasouceTypeConstant.Oracle || targetTypeId == DatasouceTypeConstant.SqlServer || targetTypeId == DatasouceTypeConstant.PostgreSQL if ((targetTypeId == DatasouceTypeConstant.MySQL
|| targetTypeId == DatasouceTypeConstant.Informix || targetTypeId == DatasouceTypeConstant.DB2)) { || targetTypeId == DatasouceTypeConstant.Oracle
//sink || targetTypeId == DatasouceTypeConstant.SqlServer
Map jdbcSinkModel = new HashMap(); || targetTypeId == DatasouceTypeConstant.PostgreSQL
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName()); || targetTypeId == DatasouceTypeConstant.Informix
jdbcSinkModel.put("url", targetDatasource.getJdbcUrl()); || targetTypeId == DatasouceTypeConstant.DB2)) {
//# 存储模式,支持overwrite、append、update、ignore、error getJdbcSink(targetDatasource,targetObj,freeMarkerConfig);
jdbcSinkModel.put("save_mode", targetObj.get("targetInsertMergeOverwrite"));
//当存储模式是 overwrite时,仅清除表中数据
if (null != targetObj.get("targetInsertMergeOverwrite")) {
if ("overwrite".equals(targetObj.get("targetInsertMergeOverwrite"))) {
jdbcSinkModel.put("truncate", "true");
} else {
jdbcSinkModel.put("truncate", "false");
}
}
jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表
jdbcSinkModel.put("user", targetDatasource.getUserName());
jdbcSinkModel.put("password", targetDatasource.getPassword());
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig);
} }
//transform //transform
if (mappingObj.size() > 0 && null != mappingObj) { if (mappingObj.size() > 0 && null != mappingObj) {
...@@ -140,29 +147,28 @@ public class SyncParameters extends AbstractParameters { ...@@ -140,29 +147,28 @@ public class SyncParameters extends AbstractParameters {
sourceField += "," + item.get("sourceField"); sourceField += "," + item.get("sourceField");
targetField += "," + item.get("sourceField") + " as " + item.get("targetField"); targetField += "," + item.get("sourceField") + " as " + item.get("targetField");
} }
//for (String tableName : registerTableName) { Map sqlModel = new HashMap();
Map sqlModel = new HashMap(); StringBuilder sql = new StringBuilder()
StringBuilder sql = new StringBuilder() .append(" select ")
.append(" select ") .append(" " + sourceField.substring(1) + " ")
.append(" " + sourceField.substring(1) + " ") .append(" from ")
.append(" from ") .append(" $t ");
.append(" $t "); sqlModel.put("sql", sql);
//.append(" " + registerTableName[0]); sqlModel.put("source_table_name", source_table_name);
sqlModel.put("sql", sql); //String table_name = registerTableName[0] + "_view";
//String table_name = registerTableName[0] + "_view"; //sqlModel.put("table_name", table_name);
//sqlModel.put("table_name", table_name); transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfig);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfig);
Map targetModel = new HashMap();
Map targetModel = new HashMap(); StringBuilder targetSql = new StringBuilder()
StringBuilder targetSql = new StringBuilder() .append(" select ")
.append(" select ") .append(" " + targetField.substring(1) + " ")
.append(" " + targetField.substring(1) + " ") .append(" from ")
.append(" from ") .append(" $t ");
.append(" $t "); targetModel.put("sql", targetSql);
//.append(" " + table_name); targetModel.put("source_table_name", targetTable);
targetModel.put("sql", targetSql); transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
//}
} }
/*if (mappingObj.size() > 0 && null != mappingObj) { /*if (mappingObj.size() > 0 && null != mappingObj) {
for (Map<String, Object> item : mappingObj) { for (Map<String, Object> item : mappingObj) {
...@@ -227,6 +233,44 @@ public class SyncParameters extends AbstractParameters { ...@@ -227,6 +233,44 @@ public class SyncParameters extends AbstractParameters {
waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig); waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
} }
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurer freeMarkerConfig) {
for (String tableName : registerTableName) {
Map jdbcModel = new HashMap();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl());
jdbcModel.put("table", tableName);
jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName());
jdbcModel.put("password", dmpSyncingDatasource.getPassword());
jdbcModel.put("partitionColumn", "");
jdbcModel.put("numPartitions", "");
jdbcModel.put("lowerBound", "");
jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
}
}
public void getJdbcSink(DmpSyncingDatasource targetDatasource,Map<String, Object> targetObj,FreeMarkerConfigurer freeMarkerConfig){
//sink
Map jdbcSinkModel = new HashMap();
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName());
jdbcSinkModel.put("url", targetDatasource.getJdbcUrl());
//# 存储模式,支持overwrite、append、update、ignore、error
jdbcSinkModel.put("save_mode", targetObj.get("targetInsertMergeOverwrite"));
//当存储模式是 overwrite时,仅清除表中数据
if (null != targetObj.get("targetInsertMergeOverwrite")) {
if ("overwrite".equals(targetObj.get("targetInsertMergeOverwrite"))) {
jdbcSinkModel.put("truncate", "true");
} else {
jdbcSinkModel.put("truncate", "false");
}
}
jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表
jdbcSinkModel.put("user", targetDatasource.getUserName());
jdbcSinkModel.put("password", targetDatasource.getPassword());
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig);
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return waterdropScript != null && !waterdropScript.isEmpty(); return waterdropScript != null && !waterdropScript.isEmpty();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment