Commit 35a165e6 authored by mcb's avatar mcb

Merge branch 'dmp_dev' of http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-service into dmp_dev

parents 12d1611a ba7b9a48
...@@ -14,12 +14,16 @@ import org.springframework.data.redis.core.RedisTemplate; ...@@ -14,12 +14,16 @@ import org.springframework.data.redis.core.RedisTemplate;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.jz.common.bean.BaseBeanResponse;
import com.jz.common.constant.CommConstant; import com.jz.common.constant.CommConstant;
import com.jz.common.constant.StatuConstant;
import com.jz.common.enums.NodeChangeTypeEnum; import com.jz.common.enums.NodeChangeTypeEnum;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNode; import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNode;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNodeChangeInfo; import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNodeChangeInfo;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro; import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro;
import com.jz.dmp.modules.controller.bean.DmpDevelopTaskDto;
import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto; import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.modules.model.DmpDevelopTask;
import com.jz.dmp.modules.model.DmpNavigationTree; import com.jz.dmp.modules.model.DmpNavigationTree;
import com.jz.dmp.modules.model.DmpProject; import com.jz.dmp.modules.model.DmpProject;
import com.jz.dmp.modules.model.DmpProjectSystemInfo; import com.jz.dmp.modules.model.DmpProjectSystemInfo;
...@@ -382,6 +386,12 @@ public class FlowParseTool { ...@@ -382,6 +386,12 @@ public class FlowParseTool {
Long publishedToProjectId = dmpProjectConfigInfoDto.getProjectId().longValue(); Long publishedToProjectId = dmpProjectConfigInfoDto.getProjectId().longValue();
Long treeId = flowPro.getTreeId(); Long treeId = flowPro.getTreeId();
BaseBeanResponse<DmpDevelopTaskDto> baseBeanResponse = dmpDevelopTaskService.findByTreeId(treeId.intValue(), null);
if (baseBeanResponse.getCode().equals(StatuConstant.FAILURE_CODE)) {
throw new Exception("获取任务失败");
}
Integer taskId = baseBeanResponse.getData().getId();
/** /**
* 当前任务生成文件存放根路径 * 当前任务生成文件存放根路径
*/ */
...@@ -425,33 +435,17 @@ public class FlowParseTool { ...@@ -425,33 +435,17 @@ public class FlowParseTool {
if ("shell".equalsIgnoreCase(nodeType)) { if ("shell".equalsIgnoreCase(nodeType)) {
// shell // shell
azkabanJobType = "command"; azkabanJobType = "command";
azkabanJobCommand = generateShellFile(flowNode, localTaskExecArgsPath); azkabanJobCommand = generateExecutorToolCommand(taskId, flowNode.getNodeName(), false);
//上传ftp下载相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//FTP链接
contents.add("ftpUrl=" + scriptJsonObject.getString("ftpUrl"));
} else if ("sql".equalsIgnoreCase(nodeType)) { } else if ("sql".equalsIgnoreCase(nodeType)) {
// sql 任务 // sql 任务
azkabanJobType = "command"; azkabanJobType = "command";
azkabanJobCommand = generateSqlFile(flowNode, localTaskExecArgsPath); azkabanJobCommand = generateExecutorToolCommand(taskId, flowNode.getNodeName(), false);
//上传ftp下载相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//console
contents.add("console=" + scriptJsonObject.getString("console"));
//console
contents.add("hdfs=" + scriptJsonObject.getString("hdfs"));
//console
contents.add("table=" + scriptJsonObject.getString("table"));
//console
contents.add("topic=" + scriptJsonObject.getString("topic"));
} else if ("sync".equalsIgnoreCase(nodeType)) { } else if ("sync".equalsIgnoreCase(nodeType)) {
//同步任务 //同步任务
azkabanJobType = "command"; azkabanJobType = "command";
azkabanJobCommand = generateSyncFile(flowNode); azkabanJobCommand = generateExecutorToolCommand(taskId, flowNode.getNodeName(), false);
} else if ("subprocess".equalsIgnoreCase(nodeType)) { } else if ("subprocess".equalsIgnoreCase(nodeType)) {
//子流程 //子流程
...@@ -459,62 +453,20 @@ public class FlowParseTool { ...@@ -459,62 +453,20 @@ public class FlowParseTool {
azkabanJobCommand = generateSubprocessFile(flowNode); azkabanJobCommand = generateSubprocessFile(flowNode);
} else if ("ftp".equalsIgnoreCase(nodeType)) { } else if ("ftp".equalsIgnoreCase(nodeType)) {
//ftp //ftp
//azkabanJobType = "command"; azkabanJobType = "command";
//azkabanJobCommand = ""; azkabanJobCommand = generateExecutorToolCommand(taskId, flowNode.getNodeName(), false);
//上传ftp下载相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//FTP链接
contents.add("ftpUrl=" + scriptJsonObject.getString("ftpUrl"));
//FTP用户名
contents.add("ftpUsername=" + scriptJsonObject.getString("ftpUsername"));
//FTP密码
contents.add("ftpPassword=" + scriptJsonObject.getString("ftpPassword"));
//FTP文件目录
contents.add("ftpSourceFileDir=" + scriptJsonObject.getString("ftpSourceFileDir"));
//保存目录
contents.add("ftpSaveDestDir=" + scriptJsonObject.getString("ftpSaveDestDir"));
}else if ("unzipFile".equalsIgnoreCase(nodeType)) { }else if ("unzipFile".equalsIgnoreCase(nodeType)) {
//unzipFile //unzipFile
//azkabanJobType = "command"; azkabanJobType = "command";
//azkabanJobCommand = ""; azkabanJobCommand = generateExecutorToolCommand(taskId, flowNode.getNodeName(), false);
//上传解压文件相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//输出目录
contents.add("zipOutputDir=" + scriptJsonObject.getString("zipOutputDir"));
//压缩文件目录
contents.add("compressedFileDir=" + scriptJsonObject.getString("compressedFileDir"));
//解压格式
contents.add("decompressFormat=" + scriptJsonObject.getString("decompressFormat"));
}else if ("docTrans".equalsIgnoreCase(nodeType)) { }else if ("docTrans".equalsIgnoreCase(nodeType)) {
//docTrans //docTrans
//azkabanJobType = "command"; azkabanJobType = "command";
//azkabanJobCommand = ""; azkabanJobCommand = generateExecutorToolCommand(taskId, flowNode.getNodeName(), false);
//上传文件转码相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//文件编码
contents.add("documentCode=" + scriptJsonObject.getString("documentCode"));
//输出地址(目录)
contents.add("outpuAddressUrl=" + scriptJsonObject.getString("outpuAddressUrl"));
//输出编码
contents.add("fileCharsetEncode=" + scriptJsonObject.getString("fileCharsetEncode"));
//文件地址(目录)
contents.add("fileAddressUrl=" + scriptJsonObject.getString("fileAddressUrl"));
}else if ("hdfs".equalsIgnoreCase(nodeType)) { }else if ("hdfs".equalsIgnoreCase(nodeType)) {
//docTrans //hdfs
//azkabanJobType = "command"; azkabanJobType = "command";
//azkabanJobCommand = ""; azkabanJobCommand = generateExecutorToolCommand(taskId, flowNode.getNodeName(), false);
//上传HDFS相关相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//文件地址
contents.add("localUploadFileDir=" + scriptJsonObject.getString("localUploadFileDir"));
//文件过滤
contents.add("hdfsUploadFileFilter=" + scriptJsonObject.getString("hdfsUploadFileFilter"));
//HDFS目录
contents.add("hdfsUploadSaveDir=" + scriptJsonObject.getString("hdfsUploadSaveDir"));
} }
//子流程类型 //子流程类型
...@@ -551,8 +503,17 @@ public class FlowParseTool { ...@@ -551,8 +503,17 @@ public class FlowParseTool {
return azkabanApiUtils.loginCreateProjectuploadZipAndSchedule("jz_workflow_new_" + publishedToProjectId, publishedToProject.getProjectDesc(), localTaskZipAbsolutePath, flowPro); return azkabanApiUtils.loginCreateProjectuploadZipAndSchedule("jz_workflow_new_" + publishedToProjectId, publishedToProject.getProjectDesc(), localTaskZipAbsolutePath, flowPro);
} }
/**
* @Title: generateExecutorToolCommand
* @Description: TODO(生成调用cmdExecutorTool jar command)
* @param @return 参数
* @return String 返回类型
* @throws
*/
public static String generateExecutorToolCommand(Integer taskId, String jobId, boolean isSingle) {
String command = "command= java -jar jz-dmp-service.jar "+taskId+" "+jobId+" "+isSingle;
return command;
}
/** /**
* 生成shell 文件 * 生成shell 文件
...@@ -623,7 +584,7 @@ public class FlowParseTool { ...@@ -623,7 +584,7 @@ public class FlowParseTool {
* @param flowNodeq * @param flowNodeq
* @return * @return
*/ */
private String generateSyncFile(FlowNode flowNode)throws Exception { private String generateSyncFile2(FlowNode flowNode)throws Exception {
/*Long syncTaskId = job.getXmlTaskTreeId(); /*Long syncTaskId = job.getXmlTaskTreeId();
if (syncTaskId == null) { if (syncTaskId == null) {
throw new RuntimeException("请选择同步任务节点"); throw new RuntimeException("请选择同步任务节点");
...@@ -650,9 +611,10 @@ public class FlowParseTool { ...@@ -650,9 +611,10 @@ public class FlowParseTool {
//获取最新版本的同步任务 //获取最新版本的同步任务
//String execXmlFileNameAndVersion = getPublishSyncTaskFileNameAndLatestVersion(taskName, syncTaskTreeId); //String execXmlFileNameAndVersion = getPublishSyncTaskFileNameAndLatestVersion(taskName, syncTaskTreeId);
//String execXmlFileName = execXmlFileNameAndVersion.split("@")[1]; //String execXmlFileName = execXmlFileNameAndVersion.split("@")[1];
String execXmlFileName = dmpDevelopTaskService.getExecXmlFileName(syncTaskTreeId); //String execXmlFileName = dmpDevelopTaskService.getExecXmlFileName(syncTaskTreeId);
//xml 执行xml的命令写到job文件中 //xml 执行xml的命令写到job文件中
String command = "command=" + dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorXmlExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + execXmlFileName; //String command = "command=" + dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorXmlExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + execXmlFileName;
String command = "";
return command; return command;
} }
......
...@@ -44,6 +44,7 @@ import com.jz.common.utils.AzkabanApiUtils2; ...@@ -44,6 +44,7 @@ import com.jz.common.utils.AzkabanApiUtils2;
import com.jz.common.utils.CodeGeneratorUtils; import com.jz.common.utils.CodeGeneratorUtils;
import com.jz.common.utils.DateUtils; import com.jz.common.utils.DateUtils;
import com.jz.common.utils.FileUtils; import com.jz.common.utils.FileUtils;
import com.jz.common.utils.FlowParseTool;
import com.jz.common.utils.GZIPUtils; import com.jz.common.utils.GZIPUtils;
import com.jz.common.utils.JsonMapper; import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.StringUtils; import com.jz.common.utils.StringUtils;
...@@ -1413,7 +1414,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop ...@@ -1413,7 +1414,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
String azkabanLocalTaskFilePath = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanLocalTaskFilePath(); //文件路径 String azkabanLocalTaskFilePath = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanLocalTaskFilePath(); //文件路径
String azkabanMonitorUrl = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanMonitorUrl();//AZKABAN WEB服务地址 String azkabanMonitorUrl = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanMonitorUrl();//AZKABAN WEB服务地址
String azkabanJobCommand = "command=" + azkabanExectorTaskExec + " " + projectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + treeName; String azkabanJobCommand = FlowParseTool.generateExecutorToolCommand(taskId, treeName, true);
/** /**
* 当前任务生成文件存放根路径 * 当前任务生成文件存放根路径
......
...@@ -88,16 +88,6 @@ public class FlowServiceImpl implements FlowService { ...@@ -88,16 +88,6 @@ public class FlowServiceImpl implements FlowService {
DmpProjectConfigInfoDto dmpProjectConfigInfoDto = baseBeanResponse.getData(); DmpProjectConfigInfoDto dmpProjectConfigInfoDto = baseBeanResponse.getData();
try { try {
/*
FlowParseTool flowParseTool = new FlowParseTool(flowPro,
publishToProject,
publishToProjectSystemInfo,
dmpDevelopTaskService,
dmpNavigationTreeService,
dmpWorkFlowSubmitDetailsService
);
*/
FlowParseTool flowParseTool = new FlowParseTool(flowPro, FlowParseTool flowParseTool = new FlowParseTool(flowPro,
publishToProject, publishToProject,
dmpProjectConfigInfoDto, dmpProjectConfigInfoDto,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment