Commit 52021917 authored by mcb's avatar mcb

Merge branch 'dmp_dev' of http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-service into dmp_dev

parents 5f068100 dc08a3d4
......@@ -32,5 +32,9 @@ public class CommConstant {
public static final String RE_SUBMIT_TOKEN = "resubmitToken";
/***************************************************/
//task_type任务类型
public static final String TASK_TYPE_OFFLINE = "2";
/***************************************************/
}
......@@ -9,9 +9,9 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.FileSystemResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
......@@ -22,6 +22,7 @@ import org.springframework.web.client.RestTemplate;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import com.google.gson.Gson;
import com.jz.common.utils.web.HttpClientUtils;
import com.jz.common.utils.web.SessionUtils;
......@@ -43,19 +44,21 @@ public class AzkabanApiUtils2 {
private String userName;
private String password;
@Autowired
RedisTemplate<String,SSOUserInfo> redisTemplate;
private RedisTemplate redisTemplate;
private static final StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
private static final FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(SSOUserInfo.class);
public AzkabanApiUtils2(String azkabanServerUrl, String userName, String password) {
this(azkabanServerUrl);
public AzkabanApiUtils2(String azkabanServerUrl, String userName, String password, RedisTemplate<String,SSOUserInfo> redisTemplate) {
this(azkabanServerUrl, redisTemplate);
this.userName = userName;
this.password = password;
}
public AzkabanApiUtils2(String azkabanServerUrl) {
public AzkabanApiUtils2(String azkabanServerUrl, RedisTemplate<String,SSOUserInfo> redisTemplate) {
this.azkabanServerUrl = azkabanServerUrl;
this.userName = "admin";
this.password = "admin";
this.redisTemplate = redisTemplate;
}
/**
......@@ -101,13 +104,15 @@ public class AzkabanApiUtils2 {
LOGGER.error(azkabanServerUrl+"-----"+linkedMultiValueMap+" sessionId 为空");
throw new RuntimeException("登陆失败");
}*/
String sessionId = SessionUtils.getSession().getId(); //"dcfc608c-c58a-45b7-adc7-9902b652496e";
//String sessionId = "f0d06f4a-874c-4dfc-8959-101b6add6bf5";
String sessionId = SessionUtils.getSession().getId();
//通过redis方式登录Azkaban
String redisKey = "spring:sessions:sessions:"+sessionId;
SSOUserInfo ssoUserInfo = redisTemplate.opsForValue().get(redisKey);
if (ssoUserInfo==null) {
SSOUserInfo ssoUserInfo = (SSOUserInfo) redisTemplate.opsForValue().get(redisKey);
if (ssoUserInfo == null) {
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(fastJsonRedisSerializer);
redisTemplate.opsForValue().set(redisKey, getSSOuserInfo());
}
......@@ -652,7 +657,7 @@ public class AzkabanApiUtils2 {
}
public static void main(String[] args) throws Exception {
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2("http://119.23.32.151:8083");
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2("http://119.23.32.151:8083", null);
boolean dw_test = azkabanApiUtils.checkFlowExists("dw_test", "123");
System.err.println(dw_test);
}
......
......@@ -10,6 +10,7 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.springframework.data.redis.core.RedisTemplate;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
......@@ -18,11 +19,11 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNode;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNodeChangeInfo;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro;
import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.modules.model.DmpDevelopTask;
import com.jz.dmp.modules.model.DmpNavigationTree;
import com.jz.dmp.modules.model.DmpProject;
import com.jz.dmp.modules.model.DmpProjectSystemInfo;
import com.jz.dmp.modules.model.DmpWorkFlowSubmitDetails;
import com.jz.dmp.modules.model.SSOUserInfo;
import com.jz.dmp.modules.service.DmpDevelopTaskService;
import com.jz.dmp.modules.service.DmpNavigationTreeService;
import com.jz.dmp.modules.service.DmpWorkFlowSubmitDetailsService;
......@@ -59,6 +60,8 @@ public class FlowParseTool {
private DmpWorkFlowSubmitDetailsService dmpWorkFlowSubmitDetailsService;
private RedisTemplate<String,SSOUserInfo> redisTemplate;
/**
* 流程属性
*/
......@@ -106,12 +109,14 @@ public class FlowParseTool {
DmpProjectSystemInfo publishedToProjectSystemInfo,
DmpDevelopTaskService dmpDevelopTaskService,
DmpNavigationTreeService dmpNavigationTreeService,
DmpWorkFlowSubmitDetailsService dmpWorkFlowSubmitDetailsService) {
DmpWorkFlowSubmitDetailsService dmpWorkFlowSubmitDetailsService,
RedisTemplate<String,SSOUserInfo> redisTemplate) {
this(flowPro, dmpWorkFlowSubmitDetailsService);
this.publishedToProject = publishedToProject;
//this.publishedToProjectSystemInfo = publishedToProjectSystemInfo;
this.dmpDevelopTaskService = dmpDevelopTaskService;
this.dmpNavigationTreeService = dmpNavigationTreeService;
this.redisTemplate = redisTemplate;
}
/**
......@@ -126,12 +131,14 @@ public class FlowParseTool {
DmpProjectConfigInfoDto dmpProjectConfigInfoDto,
DmpDevelopTaskService dmpDevelopTaskService,
DmpNavigationTreeService dmpNavigationTreeService,
DmpWorkFlowSubmitDetailsService dmpWorkFlowSubmitDetailsService) {
DmpWorkFlowSubmitDetailsService dmpWorkFlowSubmitDetailsService,
RedisTemplate<String,SSOUserInfo> redisTemplate) {
this(flowPro, dmpWorkFlowSubmitDetailsService);
this.publishedToProject = publishedToProject;
this.dmpProjectConfigInfoDto = dmpProjectConfigInfoDto;
this.dmpDevelopTaskService = dmpDevelopTaskService;
this.dmpNavigationTreeService = dmpNavigationTreeService;
this.redisTemplate = redisTemplate;
}
private void parse() {
......@@ -432,7 +439,7 @@ public class FlowParseTool {
//azkabanJobCommand = "";
//上传ftp下载相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getScript());
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//FTP链接
contents.add("ftpUrl=" + scriptJsonObject.getString("ftpUrl"));
//FTP用户名
......@@ -449,7 +456,7 @@ public class FlowParseTool {
//azkabanJobCommand = "";
//上传解压文件相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getScript());
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//输出目录
contents.add("zipOutputDir=" + scriptJsonObject.getString("zipOutputDir"));
//压缩文件目录
......@@ -462,7 +469,7 @@ public class FlowParseTool {
//azkabanJobCommand = "";
//上传文件转码相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getScript());
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//文件编码
contents.add("documentCode=" + scriptJsonObject.getString("documentCode"));
//输出地址(目录)
......@@ -477,7 +484,7 @@ public class FlowParseTool {
//azkabanJobCommand = "";
//上传HDFS相关相关参数
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getScript());
JSONObject scriptJsonObject = JSONObject.parseObject(flowNode.getNodeData());
//文件地址
contents.add("localUploadFileDir=" + scriptJsonObject.getString("localUploadFileDir"));
//文件过滤
......@@ -516,7 +523,7 @@ public class FlowParseTool {
//上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
String azkabanApiUrl = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanMonitorUrl();
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl);
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl, redisTemplate);
return azkabanApiUtils.loginCreateProjectuploadZipAndSchedule("jz_workflow_new_" + publishedToProjectId, publishedToProject.getProjectDesc(), localTaskZipAbsolutePath, flowPro);
}
......@@ -619,8 +626,7 @@ public class FlowParseTool {
//获取最新版本的同步任务
//String execXmlFileNameAndVersion = getPublishSyncTaskFileNameAndLatestVersion(taskName, syncTaskTreeId);
//String execXmlFileName = execXmlFileNameAndVersion.split("@")[1];
//DmpDevelopTask task = dmpDevelopTaskService.
String execXmlFileName = "";
String execXmlFileName = dmpDevelopTaskService.getExecXmlFileName(syncTaskTreeId);
//xml 执行xml的命令写到job文件中
String command = "command=" + dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorXmlExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + execXmlFileName;
return command;
......@@ -648,13 +654,13 @@ public class FlowParseTool {
* @param flowNode
* @return
*/
private String generateSubprocessFile(FlowNode flowNode) {
private String generateSubprocessFile(FlowNode flowNode)throws Exception {
String subProcessFlowName = flowNode.getScript();
//检查子流程是否存在 todo
String azkabanApiUrl = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanMonitorUrl();
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl);
boolean flowExists = azkabanApiUtils.checkFlowExists("jz_workflow_" + flowPro.getPublishedToProjectId(), subProcessFlowName);
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl, redisTemplate);
boolean flowExists = azkabanApiUtils.checkFlowExists("jz_workflow_new_" + flowPro.getPublishedToProjectId(), subProcessFlowName);
if (!flowExists) {
throw new RuntimeException("节点:" + flowNode.getNodeName() + "设置的子流程:" + subProcessFlowName + "不存在,请先发布" + subProcessFlowName);
}
......
......@@ -11,6 +11,7 @@ public class DmpWorkFlowSubmitDetails implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private Long scheduleProjectId;
private String scheduleFlowName;
private String nodeName;
......@@ -24,6 +25,14 @@ public class DmpWorkFlowSubmitDetails implements Serializable {
private String createTimeStr;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getScheduleProjectId() {
return scheduleProjectId;
}
......
......@@ -91,5 +91,16 @@ public interface DmpDevelopTaskService {
*/
public BaseResponse flowSubmit(DmpDevelopTask dmpDevelopTask, HttpServletRequest httpRequest)throws Exception;
/**
* @Title: getExecXmlFileName
* @Description: TODO(根据任务treeId获取xmlFileName)
* @param @param syncTaskTreeId
* @param @return
* @param @throws Exception 参数
* @return String 返回类型
* @throws
*/
public String getExecXmlFileName(Long syncTaskTreeId)throws Exception;
}
\ No newline at end of file
......@@ -14,6 +14,7 @@ import javax.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.ThrowsAdvice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
......@@ -1215,4 +1216,20 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
return baseResponse;
}
/**
*根据treeId获取离线任务xmlFileName
*/
@Override
public String getExecXmlFileName(Long syncTaskTreeId) throws Exception {
DmpDevelopTask dmpDevelopTask = dmpDevelopTaskDao.selectTaskInfoByParam(syncTaskTreeId);
if (!dmpDevelopTask.getTaskType().equals(CommConstant.TASK_TYPE_OFFLINE)) {
throw new RuntimeException("非离线任务!");
}
String xmlContent = convert2SyncXmlContent(dmpDevelopTask);
String xmlFileName = XmlUtils.getPropertyValue(xmlContent, "name");
return xmlFileName;
}
}
\ No newline at end of file
......@@ -4,6 +4,7 @@ import java.util.Date;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
......@@ -58,6 +59,9 @@ public class FlowServiceImpl implements FlowService {
@Autowired
private DmpProjectConfigInfoService dmpProjectConfigInfoService;
@Autowired
private RedisTemplate redisTemplate;
/**
*工作流发布
*/
......@@ -89,7 +93,8 @@ public class FlowServiceImpl implements FlowService {
dmpProjectConfigInfoDto,
dmpDevelopTaskService,
dmpNavigationTreeService,
dmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService,
redisTemplate
);
//保存发布信息
......
......@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
......@@ -94,6 +95,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired
private DmpDevelopTaskHistoryMapper dmpDevelopTaskHistoryMapper;
@Autowired
private RedisTemplate redisTemplate;
/**
* 离线同步任务列表分页查询
......@@ -237,7 +241,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//上传到azkaban todo
//上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl);
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName);
}
......@@ -283,7 +287,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//调用azkaban服务
String azkabanApiUrl = publishToProjectSystemInfo.getAzkabanMonitorUrl();
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl);
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl, redisTemplate);
list = azkabanApiUtils.getSyncingFlowExecution(projectId, treeName, checkTaskStatusPageReq.getPageNum(), checkTaskStatusPageReq.getPageSize());
SimpleDateFormat dtf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
......@@ -816,7 +820,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
}
String azkabanMonitorUrl = map.get("azkabanMonitorUrl").toString();
Long projectId = Long.valueOf(map.get("projectId").toString());
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl);
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
String execId = azkabanApiUtils.stopFlow("jz_localflow_" + projectId, map.get("treeName").toString());
return JsonResult.ok();
}
......
......@@ -13,20 +13,20 @@ management:
spring:
profiles: dev
datasource1:
url: jdbc:mysql://119.23.32.151:3306/dmp_web?characterEncoding=utf8&useSSL=false
url: jdbc:mysql://192.168.1.140:3307/dmp_web_new?characterEncoding=utf8&useSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: dmp
password: Ioubuy@2019@!
password: Ioubuy123
hikari:
maxLifetime: 1765000
maximumPoolSize: 20
connectionTimeout: 30000
idleTimeout: 600000
datasource2:
url: jdbc:mysql://119.23.32.151:3306/dmp_openapi?characterEncoding=utf8&autoReconnect=true&useSSL=false
url: jdbc:mysql://192.168.1.221:3306/azkaban?characterEncoding=utf8&autoReconnect=true&useSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: dmp
password: Ioubuy@2019@!
username: root
password:
hikari:
maxLifetime: 1765000
maximumPoolSize: 20
......@@ -37,6 +37,10 @@ spring:
caffeine:
spec: maximumSize=1000,expireAfterWrite=30s
public-key: rajZdV0xpCox+2vEHFLsKq2o2XVdMaQq
#gatewayIP
gateway-url: http://localhost:8088
#API配置--大数据查询,数据源支持类型条件
api-bigData-setting: Hive,MySQL,Oracle
redis:
#database: 0
#host: 119.23.13.83
......@@ -44,16 +48,12 @@ spring:
#timeout: 5000
cluster:
nodes:
#- 192.168.1.146:6379
#- 192.168.1.146:6380
#- 192.168.1.146:6381
#- 192.168.1.146:6382
#- 192.168.1.146:6383
#- 192.168.1.146:6384
#
- 172.18.104.129:8001
- 172.18.104.129:8003
- 172.18.104.129:8004
- 192.168.1.146:6379
- 192.168.1.146:6380
- 192.168.1.146:6381
- 192.168.1.146:6382
- 192.168.1.146:6383
- 192.168.1.146:6384
mail:
host: smtp.exmail.qq.com
username: service@mail.cn
......@@ -65,18 +65,6 @@ spring:
starttls:
enable: true
required: true
# ldap:
# urls: ldap://localhost:389
# base: dc=maxcrc,dc=com
# username: cn=Manager,dc=maxcrc,dc=com
# password: secret
ldap:
urls: ldap://172.18.104.128:389
base: dc=ioubuy,dc=cn
username: cn=Manager,dc=ioubuy,dc=cn
password: 'Ioubuy123'
remote:
execute:
......@@ -109,3 +97,9 @@ dmp:
evn:
open: false
name:
#日志打印
logging:
level:
com.jz.dmp: debug
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment