Commit 86a790bf authored by mcb's avatar mcb
parents 810fd04c b5170a1f
......@@ -17,7 +17,7 @@
<java.version>1.8</java.version>
<shiro.version>1.2.3</shiro.version>
<mybatis-pagehelper.version>4.2.0</mybatis-pagehelper.version>
<start-class>com.jz.dmp.cmdexectool.ApiApplication</start-class>
<start-class>com.jz.dmp.cmdexectool.CmdExecToolApplication</start-class>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
......@@ -586,7 +586,7 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.jz.dmp.cmdexectool.ApiApplication</mainClass>
<mainClass>com.jz.dmp.cmdexectool.CmdExecToolApplication</mainClass>
</configuration>
<!--<dependencies>
<dependency>
......
......@@ -20,14 +20,14 @@ import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
@EnableTransactionManagement
@SpringBootApplication
@EnableCaching
public class ApiApplication implements HealthIndicator {
public class CmdExecToolApplication implements HealthIndicator {
private static Logger logger = LoggerFactory.getLogger(ApiApplication.class);
private static Logger logger = LoggerFactory.getLogger(CmdExecToolApplication.class);
public static void main(String[] args) {
long start = System.currentTimeMillis();
ConfigurableApplicationContext context = new SpringApplicationBuilder(ApiApplication.class).web(WebApplicationType.NONE).bannerMode(Banner.Mode.OFF).run(args);
ConfigurableApplicationContext context = new SpringApplicationBuilder(CmdExecToolApplication.class).web(WebApplicationType.NONE).bannerMode(Banner.Mode.OFF).run(args);
Integer taskId = Integer.parseInt(args[0]);
if (taskId==null) {
......
......@@ -74,4 +74,10 @@ public class CommConstant {
public static final String EXECUTION_ENGINE_JDBC = "jdbc";//jdbc
public static final String EXECUTION_ENGINE_SPARK = "spark";//spark
/***************************************************/
//主键冲突处理方式
public static final String PRIMARY_KEY_CONFLICT_INSERT = "insert";
public static final String PRIMARY_KEY_CONFLICT_UPDATE = "update";
public static final String PRIMARY_KEY_CONFLICT_REPLACE = "replace";
}
......@@ -25,7 +25,6 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.EncryptionUtils;
......@@ -319,17 +318,30 @@ public class SqlParameters extends AbstractParameters {
|| this.targetBaseDataSource.getMyDbType() == MyDbType.DB2
|| this.targetBaseDataSource.getMyDbType() == MyDbType.INFORMIX) {
String saveMode = "append";
String targetTableName = tableObj.getString("targetTableName");
String primaryKeyConflict = tableObj.getString("primaryKeyConflict");
if (CommConstant.PRIMARY_KEY_CONFLICT_REPLACE.equals(primaryKeyConflict)
|| CommConstant.PRIMARY_KEY_CONFLICT_UPDATE.equals(primaryKeyConflict)) {
saveMode = "update";
}
Map<String, String> sinkJdbcModel = new HashMap<String, String>();
sinkJdbcModel.put("source_table_name", "t");
sinkJdbcModel.put("save_mode", "overwrite");
sinkJdbcModel.put("save_mode", saveMode);
sinkJdbcModel.put("truncate", "true");
sinkJdbcModel.put("url", jdbcUrl);
sinkJdbcModel.put("driver", targetSource.getDriverClassName());
sinkJdbcModel.put("user", user);
sinkJdbcModel.put("password", password);
sinkJdbcModel.put("dbtable", targetTableName);
if ("update".equals(saveMode)) {
String customUpdateStmt = ParameterUtils.columnMappingHandlerConflict(tableFieldsObj, targetTableName, this.targetBaseDataSource.getMyDbType());
if (StringUtils.isNotEmpty(customUpdateStmt)) {
sinkJdbcModel.put("customUpdateStmt", customUpdateStmt);
}
}
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_JDBC, sinkJdbcModel, freeMarkerConfigurationFactoryBean);
}
}
......
......@@ -33,9 +33,11 @@ import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.CommandType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DataType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import com.jz.dmp.cmdexectool.scheduler.common.utils.placeholder.BusinessTimeUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.placeholder.PlaceholderUtils;
......@@ -306,5 +308,114 @@ public class ParameterUtils {
return sb.toString();
}
/**
* @Title: columnMappingHandler
* @Description: TODO(开发任务SQL:生成主键冲突语句)
* @param @param jsonStr
* @param @return 参数
* @return List<Map<String,String>> 返回类型
* @throws
*/
public static String columnMappingHandlerConflict(String jsonStr, String tableName, MyDbType myDbType) {
JSONObject jsonObject = JSONObject.parseObject(jsonStr);
JSONArray sourceArray = jsonObject.getJSONArray("sourceFields");
Map<String, JSONObject> sourceMap = new HashMap<String, JSONObject>();
for (int index=0; index<sourceArray.size(); index++) {
JSONObject sourceObj = sourceArray.getJSONObject(index);
sourceMap.put(sourceObj.getString("customSoruceFieldId"), sourceObj);
}
JSONArray targetArray = jsonObject.getJSONArray("targetFields");
Map<String, JSONObject> targetMap = new HashMap<String, JSONObject>();
for (int index=0; index<targetArray.size(); index++) {
JSONObject targetObj = targetArray.getJSONObject(index);
targetMap.put(targetObj.getString("customTargetFieldId"), targetObj);
}
StringBuilder valuesSqlSb = new StringBuilder();
StringBuilder primaryKeySqlSb = new StringBuilder();
StringBuilder updateSqlSb = new StringBuilder();
JSONArray mappingArray = jsonObject.getJSONArray("columnMapping");
Integer size = mappingArray.size();
for (int index=0; index<size; index++) {
JSONObject mappingObj = mappingArray.getJSONObject(index);
JSONObject sourceObj = sourceMap.get(mappingObj.getString("customSoruceFieldId"));
JSONObject targetObj = targetMap.get(mappingObj.getString("customTargetFieldId"));
//String customSoruceFiledId = sourceObj.getString("customSoruceFiledId");
String sourceFieldName = sourceObj.getString("sourceFieldName");
//String sourceFieldType = sourceObj.getString("sourceFieldType");
//String customTargetFieldId = targetObj.getString("customTargetFieldId");
String targetFieldName = targetObj.getString("targetFieldName");
//String targetFieldType = targetObj.getString("targetFieldType");
String isPrimaryKey = targetObj.getString("isPrimaryKey");
//设置values语句
valuesSqlSb.append("?");
if (index!=size-1) {
valuesSqlSb.append(",");
}
if ("0".equals(isPrimaryKey) && myDbType.MySQL==myDbType) {
//设置update语句
updateSqlSb.append(targetFieldName);
updateSqlSb.append("=VALUES(");
updateSqlSb.append(targetFieldName);
updateSqlSb.append(")");
updateSqlSb.append(",");
} else if (myDbType.PostgreSQL==myDbType) {
if ("1".equals(isPrimaryKey)) {
primaryKeySqlSb.append("targetFieldName");
primaryKeySqlSb.append(",");
}else {
//设置update语句
updateSqlSb.append(targetFieldName);
updateSqlSb.append("=EXCLUDED.");
updateSqlSb.append(targetFieldName);
updateSqlSb.append(",");
}
}
}
String primaryKeySqlStr = "";
if (org.apache.commons.lang3.StringUtils.isNotEmpty(primaryKeySqlSb)) {
primaryKeySqlStr = primaryKeySqlSb.substring(0, primaryKeySqlSb.length()-1);
}
String updateSqlStr = "";
if (org.apache.commons.lang3.StringUtils.isNotEmpty(updateSqlSb)) {
updateSqlStr = updateSqlSb.substring(0, updateSqlSb.length()-1);
}
StringBuilder sb = new StringBuilder();
if (myDbType.MySQL==myDbType) {
sb.append("INSERT INTO ");
sb.append(tableName+" ");
sb.append("VALUES(");
sb.append(valuesSqlSb);
sb.append(") ");
sb.append("ON DUPLICATE KEY UPDATE ");
sb.append(updateSqlStr);
} else if (myDbType.PostgreSQL==myDbType) {
sb.append("INSERT INTO ");
sb.append(tableName+" ");
sb.append("VALUES(");
sb.append(valuesSqlSb);
sb.append(") ");
sb.append("ON CONFLICT(");
sb.append(primaryKeySqlStr);
sb.append(") do update ");
sb.append(updateSqlStr);
}
return sb.toString();
}
}
......@@ -204,7 +204,7 @@ public abstract class AbstractCommandExecutor {
// if timeout occurs, exit directly
//long remainTime = getRemaintime();
long remainTime = 60;
long remainTime = 300;
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
......
......@@ -126,6 +126,26 @@ public class ProcessService {
throws Exception {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
// 获取任务数据,并获取job json,并组装TaskInstance
DmpDevelopTaskDto dmpDevelopTaskDto = findDevelopTaskByTaskId(taskId, isSingle);
taskExecutionContext = dto2execcontext(dmpDevelopTaskDto, jobId);
//设置azkaban执行ID
taskExecutionContext.setExecId(execId);
return taskExecutionContext;
}
/**
* @Title: extracted
* @Description: TODO(根据taskId获取任务)
* @param @param taskId
* @param @param isSingle
* @param @return
* @param @throws Exception 参数
* @return DmpDevelopTaskDto 返回类型
* @throws
*/
private DmpDevelopTaskDto findDevelopTaskByTaskId(Integer taskId, boolean isSingle) throws Exception {
DmpDevelopTaskDto dmpDevelopTaskDto = dmpDevelopTaskMapper.findById(taskId);
if (!isSingle) {
Map<String, Object> param = new HashMap<String, Object>();
......@@ -137,12 +157,7 @@ public class ProcessService {
}
dmpDevelopTaskDto = MyDmpDevelopTaskHistoryConverter.INSTANCE().historyDto2taskDto(dmpDevelopTaskHistoryDtos.get(0));
}
taskExecutionContext = dto2execcontext(dmpDevelopTaskDto, jobId);
//设置azkaban执行ID
taskExecutionContext.setExecId(execId);
return taskExecutionContext;
return dmpDevelopTaskDto;
}
/**
......@@ -199,8 +214,19 @@ public class ProcessService {
comparLabel = lable+"_"+taskName;
}
if (jobId.equals(comparLabel)) {
jobType = JobType.valueOf(jObject.getString("taskType"));//job类型
script = jObject.toJSONString();
if (CommConstant.WORK_TYPE_SQL.equals(taskJobType)
|| CommConstant.WORK_TYPE_SHELL.equals(taskJobType)
|| CommConstant.WORK_TYPE_SYNC.equals(taskJobType)) {
String taskId2 = jObject.getString("script");//获取关联任务的taskId
DmpDevelopTaskDto dmpDevelopTaskDto2 = findDevelopTaskByTaskId(Integer.parseInt(taskId2), false);
script = dmpDevelopTaskDto2.getScript();
} else {
script = jObject.toJSONString();
}
jobType = JobType.valueOf(taskJobType);//job类型
taskAppId = lable;
break;
}
......
......@@ -23,4 +23,7 @@ Jdbc {
<#if dbtable??>
dbtable = "${dbtable!}"
</#if>
<#if customUpdateStmt??>
customUpdateStmt = "${customUpdateStmt!}"
</#if>
}
......@@ -51,7 +51,7 @@ import org.springframework.test.context.junit4.SpringRunner;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.jz.dmp.cmdexectool.ApiApplication;
import com.jz.dmp.cmdexectool.CmdExecToolApplication;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus;
......@@ -76,7 +76,7 @@ import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
* python shell command executor test
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApiApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@SpringBootTest(classes = CmdExecToolApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class SQLCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(SQLCommandExecutorTest.class);
......
......@@ -16,7 +16,7 @@
*/
package com.jz.cmdexectool.test.task.shell;
import com.jz.dmp.cmdexectool.ApiApplication;
import com.jz.dmp.cmdexectool.CmdExecToolApplication;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
......@@ -40,7 +40,7 @@ import org.springframework.test.context.junit4.SpringRunner;
* python shell command executor test
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApiApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@SpringBootTest(classes = CmdExecToolApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ShellCommandExecutorTest {
private static final Logger logger = LoggerFactory.getLogger(ShellCommandExecutorTest.class);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment