Commit d15605be authored by mcb's avatar mcb

commit

parent e4d163f7
...@@ -553,7 +553,7 @@ public class AzkabanApiUtils2 { ...@@ -553,7 +553,7 @@ public class AzkabanApiUtils2 {
String execIdsStr = ""; String execIdsStr = "";
//将所有的实例停止 //将所有的实例停止
for (Integer execid : execIds) { for (Integer execid : execIds) {
LOGGER.info("######执行实例execid:"+execid); LOGGER.info("######execte execid{}"+execid);
cancleFlow(sessionId, execid); cancleFlow(sessionId, execid);
execIdsStr= execIdsStr +","+ execid; execIdsStr= execIdsStr +","+ execid;
} }
......
...@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import com.jz.agent.service.DmpDsAgentService; import com.jz.agent.service.DmpDsAgentService;
import com.jz.common.constant.CommConstant;
import com.jz.common.constant.JsonResult; import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode; import com.jz.common.constant.ResultCode;
import com.jz.common.enums.DelFlagEnum; import com.jz.common.enums.DelFlagEnum;
...@@ -12,10 +13,7 @@ import com.jz.common.enums.TaskTreeTypeEnum; ...@@ -12,10 +13,7 @@ import com.jz.common.enums.TaskTreeTypeEnum;
import com.jz.common.page.BasePageBean; import com.jz.common.page.BasePageBean;
import com.jz.common.page.PageInfoResponse; import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService; import com.jz.common.persistence.BaseService;
import com.jz.common.utils.AzkabanApiUtils2; import com.jz.common.utils.*;
import com.jz.common.utils.FileUtils;
import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.ZipUtils;
import com.jz.common.utils.web.SessionUtils; import com.jz.common.utils.web.SessionUtils;
import com.jz.common.utils.web.XmlUtils; import com.jz.common.utils.web.XmlUtils;
import com.jz.dmp.agent.DmpAgentResult; import com.jz.dmp.agent.DmpAgentResult;
...@@ -171,6 +169,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -171,6 +169,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
*/ */
@Override @Override
public JsonResult taskRunNowByTaskId(String taskId) throws Exception { public JsonResult taskRunNowByTaskId(String taskId) throws Exception {
Map returnMap = new HashMap();
boolean flag = false; boolean flag = false;
//根据任务id,查询DMP资源导航树和DW系统配置信息 //根据任务id,查询DMP资源导航树和DW系统配置信息
Map<String, Object> map = offlineSynchDao.selectNavigationTreeByTaskId(taskId); Map<String, Object> map = offlineSynchDao.selectNavigationTreeByTaskId(taskId);
...@@ -178,10 +177,12 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -178,10 +177,12 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
Integer execId = this.publish(map); Integer execId = this.publish(map);
if (execId != null) { if (execId != null) {
flag = true; flag = true;
returnMap.put("execId", execId);
returnMap.put("flag", flag);
} }
if (flag) { if (flag) {
return new JsonResult(ResultCode.SUCCESS, flag); return new JsonResult(ResultCode.SUCCESS, returnMap);
} else { } else {
return new JsonResult(ResultCode.SYS_ERROR, flag); return new JsonResult(ResultCode.SYS_ERROR, flag);
} }
...@@ -196,12 +197,29 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -196,12 +197,29 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
private Integer publish(Map<String, Object> map) throws Exception { private Integer publish(Map<String, Object> map) throws Exception {
String taskId = map.get("taskId").toString(); //任务id String taskId = map.get("taskId").toString(); //任务id
String projectId = map.get("projectId").toString(); //项目id String projectId = map.get("projectId").toString(); //项目id
String taskType = map.get("type").toString(); //项目id
String treeName = map.get("treeName").toString(); //任务流程名称 String treeName = map.get("treeName").toString(); //任务流程名称
String azkabanExectorXmlExec = map.get("azkabanExectorXmlExec").toString(); //执行数据同步任务命令 //String azkabanExectorXmlExec = map.get("azkabanExectorXmlExec").toString(); //执行数据同步任务命令
String azkabanLocalTaskFilePath = map.get("azkabanLocalTaskFilePath").toString(); //文件路径 String azkabanLocalTaskFilePath = map.get("azkabanLocalTaskFilePath").toString(); //文件路径
String azkabanMonitorUrl = map.get("azkabanMonitorUrl").toString();//AZKABAN WEB服务地址 String azkabanMonitorUrl = map.get("azkabanMonitorUrl").toString();//AZKABAN WEB服务地址
String azkabanJobCommand = "command=" + azkabanExectorXmlExec + " " + projectId + " ${azkaban.flow.flowid} ${azkaban.job.id} ${azkaban.flow.execid} " + treeName; String azkabanJobCommand = FlowParseTool.generateExecutorToolCommand(Integer.valueOf(taskId), treeName, true);
//String azkabanJobCommand = "command=" + azkabanExectorXmlExec + " " + projectId + " ${azkaban.flow.flowid} ${azkaban.job.id} ${azkaban.flow.execid} " + treeName;
//校验任务类型并配置参数
String taskAlias = "";
if (com.jz.common.utils.StringUtils.isEmpty(taskType)) {
throw new RuntimeException("任务的任务类型为空!");
} else if (taskType.equals(CommConstant.TASK_TYPE_DEVSHELL)) {
taskAlias = "shell";
} else if (taskType.equals(CommConstant.TASK_TYPE_DEVSQL)) {
taskAlias = "sql";
} else if (taskType.equals(CommConstant.TREE_TYPE_OFFLINE)) {
taskAlias = "sync";
} else {
throw new RuntimeException("该任务类型不能调用此方法发布,请检查任务类型!");
}
/** /**
* 当前任务生成文件存放根路径 * 当前任务生成文件存放根路径
...@@ -251,7 +269,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -251,7 +269,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//上次zip包到azkaban //上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName; String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate); AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName); return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_workflow_new_" + projectId, "local_" + taskAlias + "_project", localTaskZipAbsolutePath, treeName);
//return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName);
} }
/** /**
...@@ -868,7 +887,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -868,7 +887,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
Long projectId = Long.valueOf(map.get("projectId").toString()); Long projectId = Long.valueOf(map.get("projectId").toString());
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate); AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
String execId = azkabanApiUtils.stopFlow("jz_localflow_" + projectId, map.get("treeName").toString()); String execId = azkabanApiUtils.stopFlow("jz_localflow_" + projectId, map.get("treeName").toString());
return JsonResult.ok(); Map returnMap = new HashMap();
returnMap.put("execId", execId);
return JsonResult.ok(returnMap);
} }
/** /**
...@@ -1105,6 +1126,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -1105,6 +1126,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Override @Override
public JsonResult querySyncTaskInfoById(String id) throws Exception { public JsonResult querySyncTaskInfoById(String id) throws Exception {
Map returnMap = new HashMap(); Map returnMap = new HashMap();
Map sourceMap = new HashMap();
Map targetMap = new HashMap();
DmpDevelopTask dmpDevelopTask = dmpDevelopTaskDao.selectTaskById(id); DmpDevelopTask dmpDevelopTask = dmpDevelopTaskDao.selectTaskById(id);
if (StringUtils.isNotEmpty(dmpDevelopTask.getScript())) { if (StringUtils.isNotEmpty(dmpDevelopTask.getScript())) {
Map map = (Map) JSONObject.parse(dmpDevelopTask.getScript()); Map map = (Map) JSONObject.parse(dmpDevelopTask.getScript());
...@@ -1121,21 +1144,23 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -1121,21 +1144,23 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//数据源 //数据源
JsonResult sourceDb = querygSourceDbList(projectId, sourceDbType); JsonResult sourceDb = querygSourceDbList(projectId, sourceDbType);
returnMap.put("sourceDb", sourceDb.getData()); sourceMap.put("sourceDb", sourceDb.getData());
JsonResult targetDb = querygSourceDbList(projectId, targetDbType); JsonResult targetDb = querygSourceDbList(projectId, targetDbType);
returnMap.put("targetDb", targetDb.getData()); targetMap.put("targetDb", targetDb.getData());
//表 //表
String[] sourceDbIds = sourceDbId.split(","); String[] sourceDbIds = sourceDbId.split(",");
for (int i = 0; i < sourceDbIds.length; i++) { for (int i = 0; i < sourceDbIds.length; i++) {
JsonResult sourceTable = querygSourceTableList(Long.valueOf(sourceDbIds[i]), ""); JsonResult sourceTable = querygSourceTableList(Long.valueOf(sourceDbIds[i]), "");
returnMap.put(sourceDbName[i], sourceTable.getData()); sourceMap.put(sourceDbName[i], sourceTable.getData());
} }
JsonResult targetTable = querygSourceTableList(Long.valueOf(targetDbId), ""); JsonResult targetTable = querygSourceTableList(Long.valueOf(targetDbId), "");
returnMap.put(targetName, targetTable.getData()); targetMap.put(targetName, targetTable.getData());
} }
returnMap.put("task", dmpDevelopTask); returnMap.put("task", dmpDevelopTask);
returnMap.put("sourceData", sourceMap);
returnMap.put("targetData", targetMap);
return JsonResult.ok(returnMap); return JsonResult.ok(returnMap);
} }
......
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
WHERE data_status = '1' and id = #{sourceDbId} WHERE data_status = '1' and id = #{sourceDbId}
</select> </select>
<select id="querySourceDbInfoBySourceId" parameterType="map" resultType="com.jz.dmp.modules.model.DmpAgentDatasourceInfo"> <select id="querySourceDbInfoBySourceId" flushCache="true" parameterType="map" resultType="com.jz.dmp.modules.model.DmpAgentDatasourceInfo">
SELECT SELECT
t1.id, t1.id,
t1.datasource_type as datasourceTypeId, t1.datasource_type as datasourceTypeId,
...@@ -133,6 +133,7 @@ ...@@ -133,6 +133,7 @@
t1.TREE_ID as treeId, t1.TREE_ID as treeId,
t2.PROJECT_ID as projectId, t2.PROJECT_ID as projectId,
t2.NAME as treeName, t2.NAME as treeName,
t2.type,
t3.AZKABAN_EXECTOR_XML_EXEC as azkabanExectorXmlExec, t3.AZKABAN_EXECTOR_XML_EXEC as azkabanExectorXmlExec,
t3.AZKABAN_LOCAL_TASK_FILE_PATH as azkabanLocalTaskFilePath, t3.AZKABAN_LOCAL_TASK_FILE_PATH as azkabanLocalTaskFilePath,
t3.AZKABAN_MONITOR_URL as azkabanMonitorUrl t3.AZKABAN_MONITOR_URL as azkabanMonitorUrl
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment