Commit c216a69d authored by mcb's avatar mcb

commit

parent f289f914
...@@ -53,7 +53,8 @@ public enum TaskType { ...@@ -53,7 +53,8 @@ public enum TaskType {
FTP(14, "ftp"), FTP(14, "ftp"),
UNZIPFILE(15, "unzipfile"), UNZIPFILE(15, "unzipfile"),
DOCTRANS(16, "docTrans"), DOCTRANS(16, "docTrans"),
HDFS(17, "hdfs"); HDFS(17, "hdfs"),
SYNC(18, "sync");
TaskType(int code, String descp){ TaskType(int code, String descp){
this.code = code; this.code = code;
......
...@@ -9,7 +9,6 @@ import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource; ...@@ -9,7 +9,6 @@ import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao; import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer; import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -63,7 +62,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -63,7 +62,7 @@ public class SyncParameters extends AbstractParameters {
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) { public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) throws Exception {
source = ""; source = "";
env = ""; env = "";
sink = ""; sink = "";
...@@ -72,8 +71,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -72,8 +71,7 @@ public class SyncParameters extends AbstractParameters {
this.script = script; this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script); JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, Object> scriptJson = (Map<String, Object>) scriptObj.get("params"); Map<String, Object> scripts = (Map<String, Object>) scriptObj.get("scripts");
Map<String, Object> scripts = (Map<String, Object>) scriptJson.get("scripts");
Map<String, Object> sourceObj = (Map<String, Object>) scripts.get("reader"); //来源数据 Map<String, Object> sourceObj = (Map<String, Object>) scripts.get("reader"); //来源数据
Map<String, Object> targetObj = (Map<String, Object>) scripts.get("writer"); //目标数据 Map<String, Object> targetObj = (Map<String, Object>) scripts.get("writer"); //目标数据
List<Map<String, Object>> mappingObj = (List<Map<String, Object>>) scripts.get("mappingRelation"); //字段映射关系 List<Map<String, Object>> mappingObj = (List<Map<String, Object>>) scripts.get("mappingRelation"); //字段映射关系
...@@ -105,10 +103,10 @@ public class SyncParameters extends AbstractParameters { ...@@ -105,10 +103,10 @@ public class SyncParameters extends AbstractParameters {
jdbcModel.put("result_table_name", tableName); jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName()); jdbcModel.put("user", dmpSyncingDatasource.getUserName());
jdbcModel.put("password", dmpSyncingDatasource.getPassword()); jdbcModel.put("password", dmpSyncingDatasource.getPassword());
//jdbcModel.put("partitionColumn", ""); jdbcModel.put("partitionColumn", "");
//jdbcModel.put("numPartitions", ""); jdbcModel.put("numPartitions", "");
//jdbcModel.put("lowerBound", dmpSyncingDatasource.getPassword()); jdbcModel.put("lowerBound", "");
//jdbcModel.put("upperBound", dmpSyncingDatasource.getPassword()); jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig); source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
} }
} }
...@@ -119,12 +117,14 @@ public class SyncParameters extends AbstractParameters { ...@@ -119,12 +117,14 @@ public class SyncParameters extends AbstractParameters {
Map jdbcSinkModel = new HashMap(); Map jdbcSinkModel = new HashMap();
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName()); jdbcSinkModel.put("driver", targetDatasource.getDriverClassName());
jdbcSinkModel.put("url", targetDatasource.getJdbcUrl()); jdbcSinkModel.put("url", targetDatasource.getJdbcUrl());
jdbcSinkModel.put("save_mode", targetObj.get("targetInsertMergeOverwrite")); //# 存储模式,支持overwrite、append、update、ignore、error //# 存储模式,支持overwrite、append、update、ignore、error
if (null != targetObj.get("targetInsertMergeOverwrite")) { //当存储模式是 overwrite时,仅清除表中数据 jdbcSinkModel.put("save_mode", targetObj.get("targetInsertMergeOverwrite"));
//当存储模式是 overwrite时,仅清除表中数据
if (null != targetObj.get("targetInsertMergeOverwrite")) {
if ("overwrite".equals(targetObj.get("targetInsertMergeOverwrite"))) { if ("overwrite".equals(targetObj.get("targetInsertMergeOverwrite"))) {
jdbcSinkModel.put("truncate", true); jdbcSinkModel.put("truncate", "true");
} else { } else {
jdbcSinkModel.put("truncate", false); jdbcSinkModel.put("truncate", "false");
} }
} }
jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表 jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表
...@@ -134,6 +134,35 @@ public class SyncParameters extends AbstractParameters { ...@@ -134,6 +134,35 @@ public class SyncParameters extends AbstractParameters {
} }
//transform //transform
if (mappingObj.size() > 0 && null != mappingObj) { if (mappingObj.size() > 0 && null != mappingObj) {
String sourceField = "";
String targetField = "";
for (Map<String, Object> item : mappingObj) {
sourceField += "," + item.get("sourceField");
targetField += "," + item.get("sourceField") + " as " + item.get("targetField");
}
for (String tableName : registerTableName) {
Map sqlModel = new HashMap();
StringBuilder sql = new StringBuilder()
.append(" select ")
.append(" " + sourceField.substring(1) + " ")
.append(" from ")
.append(" " + tableName);
sqlModel.put("sql", sql);
String table_name = tableName + "_view";
sqlModel.put("table_name", table_name);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfig);
Map targetModel = new HashMap();
StringBuilder sql1 = new StringBuilder()
.append(" select ")
.append(" " + targetField.substring(1) + " ")
.append(" from ")
.append(" " + table_name);
targetModel.put("sql",sql1);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
}
}
/*if (mappingObj.size() > 0 && null != mappingObj) {
for (Map<String, Object> item : mappingObj) { for (Map<String, Object> item : mappingObj) {
Map<String, String> transformJson2Model = new HashMap<String, String>(); Map<String, String> transformJson2Model = new HashMap<String, String>();
if (null != item.get("sourceField")) { if (null != item.get("sourceField")) {
...@@ -144,7 +173,7 @@ public class SyncParameters extends AbstractParameters { ...@@ -144,7 +173,7 @@ public class SyncParameters extends AbstractParameters {
} }
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_JSON2, transformJson2Model, freeMarkerConfig); transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_JSON2, transformJson2Model, freeMarkerConfig);
} }
} }*/
if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) { if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) {
//source //source
...@@ -175,8 +204,8 @@ public class SyncParameters extends AbstractParameters { ...@@ -175,8 +204,8 @@ public class SyncParameters extends AbstractParameters {
sftpModel.put("fileType", sourceObj.get("fileType").toString()); //文件类型 sftpModel.put("fileType", sourceObj.get("fileType").toString()); //文件类型
} }
sftpModel.put("delimiter", sourceObj.get("sourceCsvDelimiter")); //分隔符 sftpModel.put("delimiter", sourceObj.get("sourceCsvDelimiter")); //分隔符
sftpModel.put("head", true); //是否加载第一行 sftpModel.put("head", "true"); //是否加载第一行
sftpModel.put("multiLine", true); //多行解析 sftpModel.put("multiLine", "true"); //多行解析
if (null != sourceObj.get("sourceFtpDir")) { if (null != sourceObj.get("sourceFtpDir")) {
sftpModel.put("path", sourceObj.get("sourceFtpDir").toString()); //文件路径 sftpModel.put("path", sourceObj.get("sourceFtpDir").toString()); //文件路径
} }
...@@ -185,14 +214,15 @@ public class SyncParameters extends AbstractParameters { ...@@ -185,14 +214,15 @@ public class SyncParameters extends AbstractParameters {
} }
//sink //sink
//transform
}
} }
public void getTransForm() { //waterdrop script
Map<String, String> waterdropModel = new HashMap<String, String>();
waterdropModel.put("env", env);
waterdropModel.put("source", source);
waterdropModel.put("transform", transform);
waterdropModel.put("sink", sink);
waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
} }
@Override @Override
......
...@@ -70,6 +70,8 @@ public class EnumUtils { ...@@ -70,6 +70,8 @@ public class EnumUtils {
return TaskType.DOCTRANS; return TaskType.DOCTRANS;
}else if (taskType.equals(CommConstant.WORK_TYPE_HDFS)) { }else if (taskType.equals(CommConstant.WORK_TYPE_HDFS)) {
return TaskType.HDFS; return TaskType.HDFS;
}else if (taskType.equals(CommConstant.WORK_TYPE_SYNC)) {
return TaskType.SYNC;
} }
return null; return null;
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package com.jz.dmp.cmdexectool.scheduler.server.worker.task; package com.jz.dmp.cmdexectool.scheduler.server.worker.task;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.sync.SyncTask;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -65,7 +66,8 @@ public class TaskManager { ...@@ -65,7 +66,8 @@ public class TaskManager {
return new DoctransTask(taskExecutionContext, logger); return new DoctransTask(taskExecutionContext, logger);
case HDFS: case HDFS:
return new HdfsTask(taskExecutionContext, logger); return new HdfsTask(taskExecutionContext, logger);
case SYNC:
return new SyncTask(taskExecutionContext, logger);
default: default:
logger.error("not support task type: {}", taskExecutionContext.getTaskType()); logger.error("not support task type: {}", taskExecutionContext.getTaskType());
throw new IllegalArgumentException("not support task type"); throw new IllegalArgumentException("not support task type");
......
...@@ -41,7 +41,7 @@ import java.util.Set; ...@@ -41,7 +41,7 @@ import java.util.Set;
public class SyncTask extends AbstractTask { public class SyncTask extends AbstractTask {
/** /**
* sql parameters * sync parameters
*/ */
private SyncParameters syncParameters; private SyncParameters syncParameters;
...@@ -76,7 +76,7 @@ public class SyncTask extends AbstractTask { ...@@ -76,7 +76,7 @@ public class SyncTask extends AbstractTask {
syncParameters = (SyncParameters) taskExecutionContext.getParameters(); syncParameters = (SyncParameters) taskExecutionContext.getParameters();
if (!syncParameters.checkParameters()) { if (!syncParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid"); throw new RuntimeException("sync task params is not valid");
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment