Commit c7aa4403 authored by mcb's avatar mcb

commit

parent 23fae18b
...@@ -102,4 +102,15 @@ public class CommonUtils { ...@@ -102,4 +102,15 @@ public class CommonUtils {
return t; return t;
} }
/**
* @Title: getAzkabanName
* @Description: TODO(拼接azkaban用名称)
* @param @return 参数
* @return String 返回类型
* @throws
*/
public static String getAzkabanName(String id) {
return "jz_dmp_"+ id;
}
} }
...@@ -93,6 +93,10 @@ public class DataDevTaskListDto { ...@@ -93,6 +93,10 @@ public class DataDevTaskListDto {
@ApiModelProperty(value = "sla状态:Y 已配置,N未配置") @ApiModelProperty(value = "sla状态:Y 已配置,N未配置")
private String slaStatus; private String slaStatus;
@ApiModelProperty(value = "azkabanJobId")
private String azkabanJobId;
public String getStatus() { public String getStatus() {
return status; return status;
} }
...@@ -204,4 +208,12 @@ public class DataDevTaskListDto { ...@@ -204,4 +208,12 @@ public class DataDevTaskListDto {
public void setSlaStatus(String slaStatus) { public void setSlaStatus(String slaStatus) {
this.slaStatus = slaStatus; this.slaStatus = slaStatus;
} }
public String getAzkabanJobId() {
return azkabanJobId;
}
public void setAzkabanJobId(String azkabanJobId) {
this.azkabanJobId = azkabanJobId;
}
} }
...@@ -3,6 +3,7 @@ package com.jz.dmp.modules.model; ...@@ -3,6 +3,7 @@ package com.jz.dmp.modules.model;
import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.jz.common.utils.StringUtils;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
...@@ -447,5 +448,18 @@ public class DmpDevelopTask implements Serializable { ...@@ -447,5 +448,18 @@ public class DmpDevelopTask implements Serializable {
public String getAzkabanName() { public String getAzkabanName() {
return "jz_dmp_"+this.id; return "jz_dmp_"+this.id;
} }
/**
* @Title: getAzkabanName
* @Description: TODO(拼接azkaban用名称)
* @param @return 参数
* @return String 返回类型
* @throws
*/
public String getAzkabanName(String id) {
if(StringUtils.isNotBlank(id)){
return "jz_dmp_"+ id;
}
return getAzkabanName();
}
} }
\ No newline at end of file
...@@ -843,7 +843,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -843,7 +843,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
if (flowList.size() > 0 && flowList != null) { if (flowList.size() > 0 && flowList != null) {
for (DataDevTaskListDto str : listObj) { //业务流程任务 for (DataDevTaskListDto str : listObj) { //业务流程任务
for (Map strFlow : flowList) { //最后实例状态 for (Map strFlow : flowList) { //最后实例状态
if (str.getTaskName().equals(strFlow.get("taskName"))) { if (CommonUtils.getAzkabanName(str.getTaskId()).equals(strFlow.get("taskName"))) {
str.setStatus(String.valueOf(strFlow.get("status"))); //实例执行最后状态 str.setStatus(String.valueOf(strFlow.get("status"))); //实例执行最后状态
if (null != strFlow.get("cronExpression")) { //调度周期 if (null != strFlow.get("cronExpression")) { //调度周期
str.setSchedulinCycle(String.valueOf(strFlow.get("cronExpression"))); str.setSchedulinCycle(String.valueOf(strFlow.get("cronExpression")));
...@@ -1110,7 +1110,10 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1110,7 +1110,10 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
if (treeList.size() > 0 && treeList != null) { if (treeList.size() > 0 && treeList != null) {
for (DataDevTaskListDto treeDto : treeList) { for (DataDevTaskListDto treeDto : treeList) {
if (StringUtils.isNotBlank(treeDto.getTaskName())) { if (StringUtils.isNotBlank(treeDto.getTaskName())) {
taskName += "," + treeDto.getTaskName(); //taskName += "," + treeDto.getTaskName();
String azkabanJobId = CommonUtils.getAzkabanName(treeDto.getTaskId());
taskName += "," + azkabanJobId;
treeDto.setAzkabanJobId(azkabanJobId);
} }
} }
//通过任务名称,去查询开发实例 //通过任务名称,去查询开发实例
...@@ -1140,9 +1143,10 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1140,9 +1143,10 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
if (list.size() > 0) { if (list.size() > 0) {
for (DataDevTaskListDto treeDto : treeList) { for (DataDevTaskListDto treeDto : treeList) {
for (DataDevExamplesListDto dto : list) { for (DataDevExamplesListDto dto : list) {
if (treeDto.getTaskName().equals(dto.getTaskName())) { if (treeDto.getAzkabanJobId().equals(dto.getTaskName())) {
dto.setTaskType(treeDto.getType()); dto.setTaskType(treeDto.getType());
dto.setRunTime(dto.getRunTime() + "s"); BigDecimal cost = new BigDecimal(dto.getRunTime()).divide(new BigDecimal(1000), 2, BigDecimal.ROUND_HALF_UP);
dto.setRunTime(cost + "s");
//JsonMapper.fromJsonString(dto.getFlowData(), Map.class); //JsonMapper.fromJsonString(dto.getFlowData(), Map.class);
} }
} }
...@@ -1308,7 +1312,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1308,7 +1312,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
BaseBeanResponse<DmpDevelopTaskDto> baseBeanResponse = new BaseBeanResponse<>(); BaseBeanResponse<DmpDevelopTaskDto> baseBeanResponse = new BaseBeanResponse<>();
DmpDevelopTask dmpDevelopTask = dmpDevelopTaskDao.selectTaskInfoByParam(treeId); DmpDevelopTask dmpDevelopTask = dmpDevelopTaskDao.selectTaskInfoByParam(treeId);
//dmpDevelopTask.setScript(new String(dmpDevelopTask.getData(), "utf-8")); //dmpDevelopTask.setScript(new String(dmpDevelopTask.getData(), "utf-8"));
DmpDevelopTaskDto dmpDevelopTaskDto = MyDmpDevelopTaskConverter.INSTANCE().domain2dto(dmpDevelopTask); DmpDevelopTaskDto dmpDevelopTaskDto = MyDmpDevelopTaskConverter.INSTANCE().domain2dto(dmpDevelopTask);
...@@ -1338,17 +1342,17 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1338,17 +1342,17 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
// 版本设置 // 版本设置
// 获取已存在版本 // 获取已存在版本
DmpDevelopTask dmpDevelopTaskDb = dmpDevelopTaskDao.selectTaskById(dmpDevelopTask.getId().toString()); DmpDevelopTask dmpDevelopTaskDb = dmpDevelopTaskDao.selectTaskById(dmpDevelopTask.getId().toString());
//校验脚本内容是否有变动,变动则允许修改,否则不允许修改 //校验脚本内容是否有变动,变动则允许修改,否则不允许修改
if (!StringUtils.isEmpty(dmpDevelopTaskDb.getScript()) && dmpDevelopTaskDb.getScript().equals(dmpDevelopTask.getScript())) { if (!StringUtils.isEmpty(dmpDevelopTaskDb.getScript()) && dmpDevelopTaskDb.getScript().equals(dmpDevelopTask.getScript())) {
baseBeanResponse.setCode(StatuConstant.CODE_ERROR_PARAMETER); baseBeanResponse.setCode(StatuConstant.CODE_ERROR_PARAMETER);
baseBeanResponse.setMessage("脚本内容没有任何变化,无效的操作!"); baseBeanResponse.setMessage("脚本内容没有任何变化,无效的操作!");
return baseBeanResponse; return baseBeanResponse;
} }
String version = CodeGeneratorUtils.generatorNextTaskVesion(dmpDevelopTaskDb.getVersion()); String version = CodeGeneratorUtils.generatorNextTaskVesion(dmpDevelopTaskDb.getVersion());
dmpDevelopTask.setVersion(version); dmpDevelopTask.setVersion(version);
//dmpDevelopTask.setData(dmpDevelopTask.getScript().getBytes("utf-8")); //dmpDevelopTask.setData(dmpDevelopTask.getScript().getBytes("utf-8"));
dmpDevelopTaskDao.update(dmpDevelopTask); dmpDevelopTaskDao.update(dmpDevelopTask);
...@@ -1384,12 +1388,12 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1384,12 +1388,12 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
} }
DmpDevelopTask dmpDevelopTask = dmpDevelopTaskDao.get(treeId); DmpDevelopTask dmpDevelopTask = dmpDevelopTaskDao.get(treeId);
if (!CommConstant.TASK_TYPE_DEVELOP.equals(dmpDevelopTask.getTaskType())) { if (!CommConstant.TASK_TYPE_DEVELOP.equals(dmpDevelopTask.getTaskType())) {
baseResponse.setCode(StatuConstant.CODE_ERROR_PARAMETER); baseResponse.setCode(StatuConstant.CODE_ERROR_PARAMETER);
baseResponse.setMessage("该任务不是工作流开发任务!"); baseResponse.setMessage("该任务不是工作流开发任务!");
return baseResponse; return baseResponse;
} }
FlowPro flowPro = MyDmpDevelopTaskConverter.INSTANCE().task2flowpro(dmpDevelopTask); FlowPro flowPro = MyDmpDevelopTaskConverter.INSTANCE().task2flowpro(dmpDevelopTask);
...@@ -1451,19 +1455,19 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1451,19 +1455,19 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
@Override @Override
public BaseBeanResponse<Integer> taskAzkabanRun(Long treeId, HttpServletRequest httpRequest) throws Exception { public BaseBeanResponse<Integer> taskAzkabanRun(Long treeId, HttpServletRequest httpRequest) throws Exception {
BaseBeanResponse<Integer> baseBeanResponse = new BaseBeanResponse<Integer>(); BaseBeanResponse<Integer> baseBeanResponse = new BaseBeanResponse<Integer>();
DmpDevelopTask developTask = dmpDevelopTaskDao.get(treeId); DmpDevelopTask developTask = dmpDevelopTaskDao.get(treeId);
//先发布任务 //先发布任务
Integer execId = publishAndExecute(developTask); Integer execId = publishAndExecute(developTask);
if (execId!=null) { if (execId != null) {
baseBeanResponse.setCode(StatuConstant.SUCCESS_CODE); baseBeanResponse.setCode(StatuConstant.SUCCESS_CODE);
baseBeanResponse.setMessage("立即运行成功"); baseBeanResponse.setMessage("立即运行成功");
baseBeanResponse.setData(execId); baseBeanResponse.setData(execId);
} else { } else {
baseBeanResponse.setCode(StatuConstant.FAILURE_CODE); baseBeanResponse.setCode(StatuConstant.FAILURE_CODE);
baseBeanResponse.setMessage("立即运行失败"); baseBeanResponse.setMessage("立即运行失败");
} }
return baseBeanResponse; return baseBeanResponse;
...@@ -1694,7 +1698,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1694,7 +1698,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
if (result.containsKey("data")) { if (result.containsKey("data")) {
return JsonResult.ok(result); return JsonResult.ok(result);
} }
return JsonResult.error(ResultCode.OPERATION_DATA_NO_EXIST,String.valueOf(result.get("error"))); return JsonResult.error(ResultCode.OPERATION_DATA_NO_EXIST, String.valueOf(result.get("error")));
} }
/** /**
...@@ -1723,7 +1727,8 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1723,7 +1727,8 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
String taskName = ""; String taskName = "";
if (list.size() > 0 && list != null) { if (list.size() > 0 && list != null) {
for (DataDevTaskListDto str : list) { for (DataDevTaskListDto str : list) {
taskName += "," + str.getTaskName(); //taskName += "," + str.getTaskName();
taskName += "," + CommonUtils.getAzkabanName(str.getTaskId());
} }
} }
...@@ -1896,6 +1901,11 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1896,6 +1901,11 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
@Override @Override
public JsonResult getTaskFlowId(String projectId) throws Exception { public JsonResult getTaskFlowId(String projectId) throws Exception {
List<Map> list = dmpNavigationTreeDao.queryTaskFlowId(projectId); List<Map> list = dmpNavigationTreeDao.queryTaskFlowId(projectId);
if (list.size() > 0 && list != null) {
for(Map map :list){
map.put("name",CommonUtils.getAzkabanName(String.valueOf(map.get("taskId"))));
}
}
return JsonResult.ok(list); return JsonResult.ok(list);
} }
......
...@@ -208,7 +208,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -208,7 +208,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
String azkabanLocalTaskFilePath = map.get("azkabanLocalTaskFilePath").toString(); //文件路径 String azkabanLocalTaskFilePath = map.get("azkabanLocalTaskFilePath").toString(); //文件路径
String azkabanMonitorUrl = map.get("azkabanMonitorUrl").toString();//AZKABAN WEB服务地址 String azkabanMonitorUrl = map.get("azkabanMonitorUrl").toString();//AZKABAN WEB服务地址
String azkabanJobCommand = FlowParseTool.generateExecutorToolCommand(Integer.valueOf(taskId), treeName, true); DmpDevelopTask dmpDevelopTask = new DmpDevelopTask();
String azkabanJobName = dmpDevelopTask.getAzkabanName(taskId);
String azkabanJobCommand = FlowParseTool.generateExecutorToolCommand(Integer.valueOf(taskId), azkabanJobName, true);
//String azkabanJobCommand = "command=" + azkabanExectorXmlExec + " " + projectId + " ${azkaban.flow.flowid} ${azkaban.job.id} ${azkaban.flow.execid} " + treeName; //String azkabanJobCommand = "command=" + azkabanExectorXmlExec + " " + projectId + " ${azkaban.flow.flowid} ${azkaban.job.id} ${azkaban.flow.execid} " + treeName;
...@@ -264,17 +266,17 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -264,17 +266,17 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
contents.add(azkabanJobCommand); contents.add(azkabanJobCommand);
// 生成job文件 // 生成job文件
String jobFileAbsolutePath = localTaskSourcePath + treeName + ".job"; String jobFileAbsolutePath = localTaskSourcePath + azkabanJobName + ".job";
FileUtils.write(jobFileAbsolutePath, contents); FileUtils.write(jobFileAbsolutePath, contents);
String localZipTargetFileName = treeName + ".zip"; String localZipTargetFileName = azkabanJobName + ".zip";
ZipUtils.zip(localTaskSourcePath, localTaskZipPath, localZipTargetFileName); ZipUtils.zip(localTaskSourcePath, localTaskZipPath, localZipTargetFileName);
//上传到azkaban todo //上传到azkaban todo
//上次zip包到azkaban //上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName; String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate); AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_workflow_new_" + projectId, "local_" + taskAlias + "_project", localTaskZipAbsolutePath, treeName); return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_workflow_new_" + projectId, "local_" + taskAlias + "_project", localTaskZipAbsolutePath, azkabanJobName);
//return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName); //return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName);
} }
......
...@@ -231,6 +231,7 @@ ...@@ -231,6 +231,7 @@
select select
t1.id as treeId, t1.id as treeId,
t1.name as taskName, t1.name as taskName,
t2.id as taskId,
(case when t1.type='01' then '离线同步' when t1.type='02' then '实时同步' when t1.type='03' then '数据开发' end) as type (case when t1.type='01' then '离线同步' when t1.type='02' then '实时同步' when t1.type='03' then '数据开发' end) as type
from from
dmp_navigation_tree t1 dmp_navigation_tree t1
......
...@@ -310,13 +310,15 @@ ...@@ -310,13 +310,15 @@
<select id="queryTaskFlowId" resultType="java.util.Map"> <select id="queryTaskFlowId" resultType="java.util.Map">
SELECT SELECT
id, t1.id,
name t1.name,
t2.id as taskId
FROM FROM
dmp_navigation_tree dmp_navigation_tree t1
inner join dmp_develop_task t2 on t1.id=t2.tree_id and t2.data_status='1'
WHERE WHERE
1 = 1 AND DATA_STATUS = '1' AND IS_LEVEL = '1' AND type != '02' 1 = 1 AND t1.DATA_STATUS = '1' AND t1.IS_LEVEL = '1' AND t1.type != '02'
and project_id = #{projectId} and t1.project_id = #{projectId}
</select> </select>
</mapper> </mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment