Commit 01893647 authored by mcb's avatar mcb

Merge branch 'dmp_dev' of http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-service into dmp_dev

parents cc0d2675 5f24ed50
package com.jz.common.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.jz.common.utils.web.HttpClientUtils;
import com.jz.common.utils.web.SessionUtils;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.FileSystemResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
......@@ -19,11 +20,16 @@ import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.jz.common.utils.web.HttpClientUtils;
import com.jz.common.utils.web.SessionUtils;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro;
import com.jz.dmp.modules.model.DmpMember;
import com.jz.dmp.modules.model.DmpRole;
import com.jz.dmp.modules.model.SSOUserInfo;
/**
* azkaban ajax api 工具类
......@@ -37,6 +43,9 @@ public class AzkabanApiUtils2 {
private String userName;
private String password;
@Autowired
RedisTemplate<String,SSOUserInfo> redisTemplate;
public AzkabanApiUtils2(String azkabanServerUrl, String userName, String password) {
this(azkabanServerUrl);
this.userName = userName;
......@@ -93,12 +102,43 @@ public class AzkabanApiUtils2 {
throw new RuntimeException("登陆失败");
}*/
String sessionId = SessionUtils.getSession().getId(); //"dcfc608c-c58a-45b7-adc7-9902b652496e";
//String sessionId = "f70d53fa-55da-4688-8d00-64350e4fb8ea";
//String sessionId = "f0d06f4a-874c-4dfc-8959-101b6add6bf5";
//通过redis方式登录Azkaban
String redisKey = "spring:sessions:sessions:"+sessionId;
SSOUserInfo ssoUserInfo = redisTemplate.opsForValue().get(redisKey);
if (ssoUserInfo==null) {
redisTemplate.opsForValue().set(redisKey, getSSOuserInfo());
}
System.err.println("----sessionId="+sessionId);
return sessionId; //SessionUtils.getSession().getId();
}
/**
* @Title: getSSOuserInfo
* @Description: TODO(生成azkaban登录需要保存的实体)
* @param @return 参数
* @return SSOUserInfo 返回类型
* @throws
*/
private SSOUserInfo getSSOuserInfo(){
Map<String,String> rolePermissMap =new HashMap<>();
DmpMember dmpMember = SessionUtils.getSecurityUser();
List<DmpRole> memberProjectRoles = dmpMember.getMemberProjectRoleList();
for (DmpRole role : memberProjectRoles) {
rolePermissMap.put(role.getRoleType(), role.getRemark());
}
SSOUserInfo ssoUserInfo = new SSOUserInfo();
ssoUserInfo.setUserName(dmpMember.getUsername());
ssoUserInfo.setAzkabanRoleRefPermissions(rolePermissMap);
return ssoUserInfo;
}
/**
* 创建azkaban项目名
*
......
package com.jz.common.utils;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
public class CommonUtils {
private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
/**
* UUID随机数
* @return
......@@ -37,4 +45,61 @@ public class CommonUtils {
return uuid;
}
/**
* @param <T>
* @Title: objArrangeTree
* @Description: TODO(将所有的资源整理成树形结构)
* @param @param dmpPermissions
* @param @return
* @param @throws Exception 参数
* @return List<DmpPermission> 返回类型
* @throws
*/
public static <T> List<T> objArrangeTree(Object parentCode, List<T> objs, String parentCodeFiledName, String codeFieldName, String childrenFieldName) throws Exception {
Map<Object, List<T>> dictMap = objs.stream().collect(Collectors.groupingBy(x->{
try {
return ReflectAssistUtils.getFieldValueByFieldName(parentCodeFiledName, x);
} catch (Exception e) {
e.printStackTrace();
logger.error("树形结构封装异常【{}】", e);
}
return "";
}));
List<T> tList = dictMap.get(parentCode);// 获取顶层资源
if (!CollectionUtils.isEmpty(tList)) {
for (T t : tList) {
t = arrangeChildren(t, dictMap, codeFieldName, childrenFieldName);
}
}
return tList;
}
/**
* @Title: arrangeChildren
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param permission
* @param @param dictMap
* @param @return
* @param @throws Exception 参数
* @return DmpPermission 返回类型
* @throws
*/
private static <T> T arrangeChildren(T t, Map<Object, List<T>> dictMap, String codeFieldName, String childrenFieldName)throws Exception{
Object code = ReflectAssistUtils.getFieldValueByFieldName(codeFieldName, t);
List<T> children = dictMap.get(code);
if (!CollectionUtils.isEmpty(children)) {
for (T child : children) {
child = arrangeChildren(child, dictMap, codeFieldName, childrenFieldName);
}
ReflectAssistUtils.setFieldValueByFieldName(childrenFieldName, t, children);
}
return t;
}
}
......@@ -16,6 +16,7 @@ import com.jz.common.enums.NodeChangeTypeEnum;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNode;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNodeChangeInfo;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro;
import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.modules.model.DmpNavigationTree;
import com.jz.dmp.modules.model.DmpProject;
import com.jz.dmp.modules.model.DmpProjectSystemInfo;
......@@ -42,15 +43,43 @@ public class FlowParseTool {
/**
* 要发布到的项目配置信息
*/
private DmpProjectSystemInfo publishedToProjectSystemInfo;
//private DmpProjectSystemInfo publishedToProjectSystemInfo;
/**
* 项目配置信息(调整)
*/
private DmpProjectConfigInfoDto dmpProjectConfigInfoDto;
private DmpDevelopTaskService dmpDevelopTaskService;
private DmpNavigationTreeService dmpNavigationTreeService;
private DmpWorkFlowSubmitDetailsService dmpWorkFlowSubmitDetailsService;
/**
* 流程属性
*/
private FlowPro flowPro;
/**
* 节点依赖关系
*/
private Map<String, String> nodeDependcyRefMap;
/**
* 流程节点
* key是节点id
*/
private Map<String, FlowNode> flowNodeMap;
/**
* 流程变更数据
*/
private Map flowChangedMap;
/**
* 不发布项目用
*
......@@ -78,34 +107,30 @@ public class FlowParseTool {
DmpWorkFlowSubmitDetailsService dmpWorkFlowSubmitDetailsService) {
this(flowPro, dmpWorkFlowSubmitDetailsService);
this.publishedToProject = publishedToProject;
this.publishedToProjectSystemInfo = publishedToProjectSystemInfo;
//this.publishedToProjectSystemInfo = publishedToProjectSystemInfo;
this.dmpDevelopTaskService = dmpDevelopTaskService;
this.dmpNavigationTreeService = dmpNavigationTreeService;
}
/**
* 流程属性
*/
private FlowPro flowPro;
/**
* 节点依赖关系
*/
private Map<String, String> nodeDependcyRefMap;
/**
* 流程节点
* key是节点id
*/
private Map<String, FlowNode> flowNodeMap;
/**
* 流程变更数据
* 发布项目用
*
* @param flowPro
* @param publishedToProject
* @param dmpProjectConfigInfoDto
*/
private Map flowChangedMap;
public FlowParseTool(FlowPro flowPro,
DmpProject publishedToProject,
DmpProjectConfigInfoDto dmpProjectConfigInfoDto,
DmpDevelopTaskService dmpDevelopTaskService,
DmpNavigationTreeService dmpNavigationTreeService,
DmpWorkFlowSubmitDetailsService dmpWorkFlowSubmitDetailsService) {
this(flowPro, dmpWorkFlowSubmitDetailsService);
this.publishedToProject = publishedToProject;
this.dmpProjectConfigInfoDto = dmpProjectConfigInfoDto;
this.dmpDevelopTaskService = dmpDevelopTaskService;
this.dmpNavigationTreeService = dmpNavigationTreeService;
}
private void parse() {
......@@ -337,13 +362,13 @@ public class FlowParseTool {
*/
public boolean publish()throws Exception {
Long publishedToProjectId = publishedToProjectSystemInfo.getProjectId();
Long publishedToProjectId = dmpProjectConfigInfoDto.getProjectId().longValue();
Long treeId = flowPro.getTreeId();
/**
* 当前任务生成文件存放根路径
*/
String localTaskPath = publishedToProjectSystemInfo.getAzkabanLocalTaskFilePath() + "/" + publishedToProjectId + "/" + treeId;
String localTaskPath = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanLocalTaskFilePath() + "/" + publishedToProjectId + "/" + treeId;
File localTaskFile = new File(localTaskPath);
if (!localTaskFile.exists()) {
localTaskFile.mkdirs();
......@@ -430,12 +455,14 @@ public class FlowParseTool {
//上传到azkaban todo
//上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
String azkabanApiUrl = publishedToProjectSystemInfo.getAzkabanMonitorUrl();
String azkabanApiUrl = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanMonitorUrl();
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl);
return azkabanApiUtils.loginCreateProjectuploadZipAndSchedule("jz_workflow_" + publishedToProjectId, publishedToProject.getProjectDesc(), localTaskZipAbsolutePath, flowPro);
return azkabanApiUtils.loginCreateProjectuploadZipAndSchedule("jz_workflow_new_" + publishedToProjectId, publishedToProject.getProjectDesc(), localTaskZipAbsolutePath, flowPro);
}
/**
* 生成shell 文件
*
......@@ -446,7 +473,7 @@ public class FlowParseTool {
String fileName = flowNode.getNodeName() + ".sh";
String scriptFileAbsolutePath = localTaskExecArgsPath + fileName;
Long publishedToProjectId = publishedToProjectSystemInfo.getProjectId();
Long publishedToProjectId = dmpProjectConfigInfoDto.getProjectId().longValue();
Long treeId = flowPro.getTreeId();
List<String> list = new ArrayList<>();
......@@ -456,16 +483,16 @@ public class FlowParseTool {
//远程shell 路径
String remoteShellDir = publishedToProjectSystemInfo.getAzkabanExectorShellPath() + "/" + publishedToProjectId + "/" + treeId + "/";
String remoteShellDir = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorShellPath() + "/" + publishedToProjectId + "/" + treeId + "/";
//上传shell文件 todo
SFTPUtils sftpUtils = new SFTPUtils(publishedToProjectSystemInfo.getShellCmdServer(),
publishedToProjectSystemInfo.getShellCmdUser(),
publishedToProjectSystemInfo.getShellCmdPassword(),
publishedToProjectSystemInfo.getShellSftpPort());
SFTPUtils sftpUtils = new SFTPUtils(dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getShellCmdServer(),
dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getShellCmdUser(),
dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getShellCmdPassword(),
dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getShellSftpPort());
sftpUtils.singleUploadFile(localTaskExecArgsPath, fileName, remoteShellDir);
String command = "command=" + publishedToProjectSystemInfo.getAzkabanExectorShellExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + remoteShellDir + fileName;
String command = "command=" + dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorShellExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + remoteShellDir + fileName;
return command;
}
......@@ -480,22 +507,22 @@ public class FlowParseTool {
String fileName = flowNode.getNodeName() + ".sql";
String scriptFileAbsolutePath = localTaskExecArgsPath + fileName;
Long publishedToProjectId = publishedToProjectSystemInfo.getProjectId();
Long publishedToProjectId = dmpProjectConfigInfoDto.getProjectId().longValue();
Long treeId = flowPro.getTreeId();
FileUtils.write(scriptFileAbsolutePath, flowNode.getScript());
//上传sql文件 todo
//远程shell 路径
String remoteSqlDir = publishedToProjectSystemInfo.getAzkabanExectorSqlPath() + "/" + publishedToProjectId + "/" + treeId + "/";
String remoteSqlDir = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorSqlPath() + "/" + publishedToProjectId + "/" + treeId + "/";
//上传shell文件 todo
SFTPUtils sftpUtils = new SFTPUtils(publishedToProjectSystemInfo.getShellCmdServer(),
publishedToProjectSystemInfo.getShellCmdUser(),
publishedToProjectSystemInfo.getShellCmdPassword(),
publishedToProjectSystemInfo.getShellSftpPort());
SFTPUtils sftpUtils = new SFTPUtils(dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getShellCmdServer(),
dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getShellCmdUser(),
dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getShellCmdPassword(),
dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getShellSftpPort());
sftpUtils.singleUploadFile(localTaskExecArgsPath, fileName, remoteSqlDir);
String command = "command=" + publishedToProjectSystemInfo.getAzkabanExectorSqlExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + remoteSqlDir + fileName;
String command = "command=" + dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorSqlExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + remoteSqlDir + fileName;
return command;
}
......@@ -516,7 +543,7 @@ public class FlowParseTool {
String execXmlFileName = execXmlFileNameAndVersion.split("@")[1];*/
//任务所在项目id
Long projectId = flowPro.getProjectId();
Long publishedToProjectId = publishedToProjectSystemInfo.getProjectId();
Long publishedToProjectId = dmpProjectConfigInfoDto.getProjectId().longValue();
//根据taskName获取treeId
String taskName = flowNode.getScript();
......@@ -533,7 +560,7 @@ public class FlowParseTool {
String execXmlFileNameAndVersion = getPublishSyncTaskFileNameAndLatestVersion(taskName, syncTaskTreeId);
String execXmlFileName = execXmlFileNameAndVersion.split("@")[1];
//xml 执行xml的命令写到job文件中
String command = "command=" + publishedToProjectSystemInfo.getAzkabanExectorXmlExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + execXmlFileName;
String command = "command=" + dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorXmlExec() + " " + publishedToProjectId + " ${azkaban.flow.flowid} ${azkaban.job.id} " + execXmlFileName;
return command;
}
......@@ -563,7 +590,7 @@ public class FlowParseTool {
String subProcessFlowName = flowNode.getScript();
//检查子流程是否存在 todo
String azkabanApiUrl = publishedToProjectSystemInfo.getAzkabanMonitorUrl();
String azkabanApiUrl = dmpProjectConfigInfoDto.getDmpPublicConfigInfoDto().getAzkabanMonitorUrl();
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl);
boolean flowExists = azkabanApiUtils.checkFlowExists("jz_workflow_" + flowPro.getPublishedToProjectId(), subProcessFlowName);
if (!flowExists) {
......
package com.jz.common.utils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import com.jz.dmp.modules.controller.bean.DmpNavigationTreeDto;
/**
* @ClassName: ReflectAssistUtils
* @Description: TODO(反射辅助工具类)
* @author ybz
* @date 2021年1月26日
*
*/
public class ReflectAssistUtils {
/**
* @Title: getFieldValueByFieldName
* @Description: TODO(根据属性名称获取属性值)
* @param @param fieldName
* @param @param cls
* @param @return
* @param @throws Exception 参数
* @return Field 返回类型
* @throws
*/
public static Object getFieldValueByFieldName(String fieldName, Object obj)throws Exception{
Class<?> cls = obj.getClass();
String getMethodName = "get"+fieldName.substring(0, 1).toUpperCase()+fieldName.substring(1);
Method getMethod = cls.getMethod(getMethodName);
return getMethod.invoke(obj);
}
/**
* @param <T>
* @Title: setFieldValueByFieldName
* @Description: TODO(设置属性值)
* @param @param fieldName
* @param @param cls
* @param @param fieldVaule
* @param @throws Exception 参数
* @return void 返回类型
* @throws
*/
public static <T> void setFieldValueByFieldName(String fieldName, Object obj, T fieldVaule)throws Exception{
Class<?> cls = obj.getClass();
String setMethodName = "set"+fieldName.substring(0, 1).toUpperCase()+fieldName.substring(1);
Class<?> fieldValueClass = fieldVaule.getClass();
if (fieldVaule instanceof ArrayList) {
fieldValueClass = List.class;
}
Method setMethod = cls.getMethod(setMethodName, fieldValueClass);
setMethod.invoke(obj, fieldVaule);
}
public static void main(String[] args) {
try {
DmpNavigationTreeDto dmpNavigationTreeDto = new DmpNavigationTreeDto();
List<DmpNavigationTreeDto> list = new ArrayList<DmpNavigationTreeDto>();
setFieldValueByFieldName("children", dmpNavigationTreeDto, list);
System.out.println(getFieldValueByFieldName("children", dmpNavigationTreeDto));
} catch (Exception e) {
e.printStackTrace();
}
}
}
......@@ -122,6 +122,7 @@ public class SFTPUtils {
* @param remoteFileDirPath 要上传到的远程文件路径
*/
public void singleUploadFile(String localFileDirPath,String uploadFileName,String remoteFileDirPath) {
String pathTeString = "C:\\opt\\dmp\\dmp_web\\35\\705\\execArgs\\";
//本地文件绝对路径
String localFileAbsolutePath = localFileDirPath+uploadFileName;
String remoteFileAbsolutePath = remoteFileDirPath+"/"+uploadFileName;
......@@ -129,7 +130,8 @@ public class SFTPUtils {
createRemoteDirs(remoteFileDirPath);
try {
sftp.put(localFileAbsolutePath, remoteFileAbsolutePath,ChannelSftp.OVERWRITE);
//sftp.put(localFileAbsolutePath, remoteFileAbsolutePath,ChannelSftp.OVERWRITE);
sftp.put(pathTeString+uploadFileName, remoteFileAbsolutePath,ChannelSftp.OVERWRITE);
sftp.chmod(Integer.parseInt("775",8), remoteFileAbsolutePath);
LOGGER.info("上传"+localFileAbsolutePath+" 到 "+remoteFileAbsolutePath+" 成功");
} catch (SftpException e) {
......
......@@ -159,5 +159,4 @@ public class DmpNavigationTreeController {
return baseBeanResponse;
}
}
\ No newline at end of file
package com.jz.dmp.modules.controller.bean;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.BeanUtils;
......@@ -56,6 +58,8 @@ public class MyDmpDevelopTaskConverter {
flowPro.setPublishedToProjectId(dmpDevelopTask.getProjectId().longValue());
//dmp里生成的任务id
flowPro.setTaskId(dmpDevelopTask.getId().longValue());
//dmp生成树ID
flowPro.setTreeId(dmpDevelopTask.getTreeId().longValue());
//是否带版本号进行节点变更查询?
//检查节点名称要用到的参数?
......@@ -72,4 +76,20 @@ public class MyDmpDevelopTaskConverter {
return flowPro;
}
/**
* @Title: scriptToFlowProJson
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param script
* @param @return 参数
* @return String 返回类型
* @throws
*/
private String scriptToFlowProJson(String script) {
Map<String, Object> map = new HashMap<String, Object>();
JSONObject jsonObject = JSONObject.parseObject(script);
map.put("flowPro", jsonObject);
return JSONObject.toJSONString(map);
}
}
......@@ -2,6 +2,7 @@ package com.jz.dmp.modules.controller.projconfig.bean;
import com.jz.dmp.modules.model.DmpProjectConfigInfo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**项目配置表Dto
* @author ybz
......@@ -10,4 +11,20 @@ import io.swagger.annotations.ApiModel;
@ApiModel(value = "项目配置表Dto", description = "项目配置表Dto")
public class DmpProjectConfigInfoDto extends DmpProjectConfigInfo {
/**
*
*/
private static final long serialVersionUID = 1L;
@ApiModelProperty(value = "公共配置")
private DmpPublicConfigInfoDto dmpPublicConfigInfoDto;
public DmpPublicConfigInfoDto getDmpPublicConfigInfoDto() {
return dmpPublicConfigInfoDto;
}
public void setDmpPublicConfigInfoDto(DmpPublicConfigInfoDto dmpPublicConfigInfoDto) {
this.dmpPublicConfigInfoDto = dmpPublicConfigInfoDto;
}
}
......@@ -155,6 +155,30 @@ public class DmpPublicConfigInfoRequest extends BasePageBean {
@ApiModelProperty(value = "元数据服务web地址")
private String atlasMonitorUrl;
/**
* 远程连接默认SERVER地址
*/
@ApiModelProperty(value = "远程连接默认SERVER地址")
private String shellCmdServer;
/**
* 远程连接默认用户
*/
@ApiModelProperty(value = "远程连接默认用户")
private String shellCmdUser;
/**
* 远程连接默认用户密码
*/
@ApiModelProperty(value = "远程连接默认用户密码")
private String shellCmdPassword;
/**
* 上传配置的SFTP端口
*/
@ApiModelProperty(value = "上传配置的SFTP端口")
private Integer shellSftpPort;
/**
* 备注
*/
......@@ -395,6 +419,34 @@ public class DmpPublicConfigInfoRequest extends BasePageBean {
this.atlasMonitorUrl = atlasMonitorUrl;
}
public void setShellCmdServer(String shellCmdServer) {
this.shellCmdServer = shellCmdServer;
}
public String getShellCmdUser() {
return shellCmdUser;
}
public void setShellCmdUser(String shellCmdUser) {
this.shellCmdUser = shellCmdUser;
}
public String getShellCmdPassword() {
return shellCmdPassword;
}
public void setShellCmdPassword(String shellCmdPassword) {
this.shellCmdPassword = shellCmdPassword;
}
public Integer getShellSftpPort() {
return shellSftpPort;
}
public void setShellSftpPort(Integer shellSftpPort) {
this.shellSftpPort = shellSftpPort;
}
public String getRemark() {
return remark;
}
......
......@@ -88,7 +88,7 @@ public class DmpNavigationTree implements Serializable {
* 父节点ID
*/
@ApiModelProperty(value = "父节点ID")
private Integer parentId;
public Integer parentId;
public Integer getId() {
......
......@@ -164,6 +164,30 @@ public class DmpPublicConfigInfo implements Serializable{
@ApiModelProperty(value = "元数据服务web地址")
private String atlasMonitorUrl;
/**
* 远程连接默认SERVER地址
*/
@ApiModelProperty(value = "远程连接默认SERVER地址")
private String shellCmdServer;
/**
* 远程连接默认用户
*/
@ApiModelProperty(value = "远程连接默认用户")
private String shellCmdUser;
/**
* 远程连接默认用户密码
*/
@ApiModelProperty(value = "远程连接默认用户密码")
private String shellCmdPassword;
/**
* 上传配置的SFTP端口
*/
@ApiModelProperty(value = "上传配置的SFTP端口")
private Integer shellSftpPort;
/**
* 备注
*/
......@@ -397,6 +421,38 @@ public class DmpPublicConfigInfo implements Serializable{
this.atlasMonitorUrl = atlasMonitorUrl;
}
public String getShellCmdServer() {
return shellCmdServer;
}
public void setShellCmdServer(String shellCmdServer) {
this.shellCmdServer = shellCmdServer;
}
public String getShellCmdUser() {
return shellCmdUser;
}
public void setShellCmdUser(String shellCmdUser) {
this.shellCmdUser = shellCmdUser;
}
public String getShellCmdPassword() {
return shellCmdPassword;
}
public void setShellCmdPassword(String shellCmdPassword) {
this.shellCmdPassword = shellCmdPassword;
}
public Integer getShellSftpPort() {
return shellSftpPort;
}
public void setShellSftpPort(Integer shellSftpPort) {
this.shellSftpPort = shellSftpPort;
}
public String getRemark() {
return remark;
}
......
package com.jz.dmp.modules.model;
import java.io.Serializable;
import java.util.Map;
public class SSOUserInfo implements Serializable {
//用户名
private String userName;
//用户角色对应的用户信息
private Map<String,String> azkabanRoleRefPermissions;
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public Map<String, String> getAzkabanRoleRefPermissions() {
return azkabanRoleRefPermissions;
}
public void setAzkabanRoleRefPermissions(Map<String, String> azkabanRoleRefPermissions) {
this.azkabanRoleRefPermissions = azkabanRoleRefPermissions;
}
}
\ No newline at end of file
......@@ -17,6 +17,7 @@ import org.springframework.util.StringUtils;
import com.jz.common.bean.BaseBeanResponse;
import com.jz.common.constant.StatuConstant;
import com.jz.common.persistence.BaseService;
import com.jz.common.utils.CommonUtils;
import com.jz.dmp.modules.controller.bean.DmpNavigationTreeDto;
import com.jz.dmp.modules.controller.bean.DmpNavigationTreeRequest;
import com.jz.dmp.modules.controller.bean.MyDmpNavigationTreeConverter;
......
......@@ -7,6 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.jz.common.bean.BaseBeanResponse;
import com.jz.common.bean.BaseResponse;
import com.jz.common.constant.StatuConstant;
import com.jz.common.utils.FlowParseTool;
......@@ -16,6 +17,7 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro;
import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.modules.dao.DmpProjectDao;
import com.jz.dmp.modules.dao.DmpWorkFlowSubmitDetailsDao;
import com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper;
import com.jz.dmp.modules.model.DmpProject;
import com.jz.dmp.modules.model.DmpProjectSystemInfo;
import com.jz.dmp.modules.model.DmpWorkFlowSubmitDetails;
......@@ -24,6 +26,7 @@ import com.jz.dmp.modules.service.DmpNavigationTreeService;
import com.jz.dmp.modules.service.DmpProjectService;
import com.jz.dmp.modules.service.DmpWorkFlowSubmitDetailsService;
import com.jz.dmp.modules.service.FlowService;
import com.jz.dmp.modules.service.projconfig.DmpProjectConfigInfoService;
/**
* @ClassName: FlowServiceImpl
......@@ -52,6 +55,9 @@ public class FlowServiceImpl implements FlowService {
@Autowired
private DmpWorkFlowSubmitDetailsDao dmpWorkFlowSubmitDetailsDao;
@Autowired
private DmpProjectConfigInfoService dmpProjectConfigInfoService;
/**
*工作流发布
*/
......@@ -63,9 +69,12 @@ public class FlowServiceImpl implements FlowService {
flowPro.setCheckVerion(true);
Long publishedToProjectId = flowPro.getPublishedToProjectId();
DmpProject publishToProject = dmpProjectDao.get(publishedToProjectId);
DmpProjectSystemInfo publishToProjectSystemInfo = dmpProjectService.getProjectSystemInfo(publishedToProjectId);
//DmpProjectConfigInfoDto dmpProjectConfigInfoDto =
//DmpProjectSystemInfo publishToProjectSystemInfo = dmpProjectService.getProjectSystemInfo(publishedToProjectId);
BaseBeanResponse<DmpProjectConfigInfoDto> baseBeanResponse = dmpProjectConfigInfoService.findByProjectId(publishedToProjectId.intValue(), null);
DmpProjectConfigInfoDto dmpProjectConfigInfoDto = baseBeanResponse.getData();
try {
/*
FlowParseTool flowParseTool = new FlowParseTool(flowPro,
publishToProject,
publishToProjectSystemInfo,
......@@ -73,6 +82,15 @@ public class FlowServiceImpl implements FlowService {
dmpNavigationTreeService,
dmpWorkFlowSubmitDetailsService
);
*/
FlowParseTool flowParseTool = new FlowParseTool(flowPro,
publishToProject,
dmpProjectConfigInfoDto,
dmpDevelopTaskService,
dmpNavigationTreeService,
dmpWorkFlowSubmitDetailsService
);
//保存发布信息
List<FlowNodeChangeInfo> flowNodeChangeList = flowParseTool.getChangedNodes();
......
......@@ -29,6 +29,8 @@ import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoBatch;
import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoRequest;
import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectEngineParamDto;
import com.jz.dmp.modules.controller.projconfig.bean.DmpPublicConfigInfoDto;
import com.jz.dmp.modules.controller.projconfig.bean.DmpPublicConfigInfoRequest;
import com.jz.dmp.modules.dao.projconfig.DmpProjectConfigEngineMapper;
import com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper;
import com.jz.dmp.modules.dao.projconfig.DmpProjectEngineParamMapper;
......@@ -36,6 +38,7 @@ import com.jz.dmp.modules.model.DmpProjectConfigEngine;
import com.jz.dmp.modules.model.DmpProjectConfigInfo;
import com.jz.dmp.modules.model.DmpProjectEngineParam;
import com.jz.dmp.modules.service.projconfig.DmpProjectConfigInfoService;
import com.jz.dmp.modules.service.projconfig.DmpPublicConfigInfoService;
/**
* 项目配置表服务的实现?
......@@ -55,6 +58,9 @@ public class DmpProjectConfigInfoServiceImpl extends BaseService implements DmpP
@Autowired
private DmpProjectEngineParamMapper dmpProjectEngineParamMapper;
@Autowired
private DmpPublicConfigInfoService dmpPublicConfigInfoService;
/*
* (non-Javadoc)
*
......@@ -668,6 +674,13 @@ public class DmpProjectConfigInfoServiceImpl extends BaseService implements DmpP
if (!CollectionUtils.isEmpty(list)) {
dto = list.get(0);
//设置公共属性
DmpPublicConfigInfoRequest request = new DmpPublicConfigInfoRequest();
BaseBeanResponse<DmpPublicConfigInfoDto> configInfoBeanResponse = dmpPublicConfigInfoService.findList(request, null);
List<DmpPublicConfigInfoDto> configInfoDtos = configInfoBeanResponse.getDatas();
if (!CollectionUtils.isEmpty(configInfoDtos)) {
dto.setDmpPublicConfigInfoDto(configInfoDtos.get(0));
}
}
baseBeanResponse.setCode(StatuConstant.SUCCESS_CODE);
......
......@@ -26,6 +26,10 @@
<result column="azkaban_exector_shell_export_data" property="azkabanExectorShellExportData" jdbcType="VARCHAR" />
<result column="azkaban_monitor_url" property="azkabanMonitorUrl" jdbcType="VARCHAR" />
<result column="atlas_monitor_url" property="atlasMonitorUrl" jdbcType="VARCHAR" />
<result column="shell_cmd_server" property="shellCmdServer" jdbcType="VARCHAR" />
<result column="shell_cmd_user" property="shellCmdUser" jdbcType="VARCHAR" />
<result column="shell_cmd_password" property="shellCmdPassword" jdbcType="VARCHAR" />
<result column="shell_sftp_port" property="shellSftpPort" jdbcType="INTEGER" />
<result column="remark" property="remark" jdbcType="VARCHAR" />
<result column="data_status" property="dataStatus" jdbcType="CHAR" />
<result column="create_user_id" property="createUserId" jdbcType="INTEGER" />
......@@ -43,8 +47,9 @@
kerberos_fqdn, kerberos_keytab_conf, kerberos_keytab_user, kerberos_spark_jaas_conf, hdfs_http_path,
hdfs_syncing_path, hdfs_user_name, kafka_conector_url, kafka_schema_register_url, kafka_bootstrap_servers,
azkaban_exector_shell_exec, azkaban_exector_sql_exec, azkaban_exector_xml_exec, azkaban_exector_sql_path, azkaban_exector_shell_path,
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, remark,
data_status, create_user_id, create_time, update_user_id, update_time
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, shell_cmd_server,
shell_cmd_user, shell_cmd_password, shell_sftp_port, remark, data_status,
create_user_id, create_time, update_user_id, update_time
</sql>
<sql id="BaseDto_Column_List">
......@@ -152,6 +157,18 @@
<if test="atlasMonitorUrl != null" >
AND atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR}
</if>
<if test="shellCmdServer != null" >
AND shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR}
</if>
<if test="shellCmdUser != null" >
AND shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR}
</if>
<if test="shellCmdPassword != null" >
AND shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR}
</if>
<if test="shellSftpPort != null" >
AND shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER}
</if>
<if test="remark != null" >
AND remark = #{remark,jdbcType=VARCHAR}
</if>
......@@ -259,6 +276,18 @@
<if test="atlasMonitorUrl != null" >
AND atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR}
</if>
<if test="shellCmdServer != null" >
AND shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR}
</if>
<if test="shellCmdUser != null" >
AND shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR}
</if>
<if test="shellCmdPassword != null" >
AND shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR}
</if>
<if test="shellSftpPort != null" >
AND shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER}
</if>
<if test="remark != null" >
AND remark = #{remark,jdbcType=VARCHAR}
</if>
......@@ -294,16 +323,18 @@
kerberos_fqdn, kerberos_keytab_conf, kerberos_keytab_user, kerberos_spark_jaas_conf, hdfs_http_path,
hdfs_syncing_path, hdfs_user_name, kafka_conector_url, kafka_schema_register_url, kafka_bootstrap_servers,
azkaban_exector_shell_exec, azkaban_exector_sql_exec, azkaban_exector_xml_exec, azkaban_exector_sql_path, azkaban_exector_shell_path,
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, remark,
data_status, create_user_id, create_time, update_user_id, update_time
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, shell_cmd_server,
shell_cmd_user, shell_cmd_password, shell_sftp_port, remark, data_status,
create_user_id, create_time, update_user_id, update_time
)
values (
#{publicConfigId,jdbcType=INTEGER}, #{kerberosIsenable,jdbcType=CHAR}, #{kerberosJaasClientName,jdbcType=VARCHAR}, #{kerberosKrb5Conf,jdbcType=VARCHAR}, #{kerberosJaasConf,jdbcType=VARCHAR},
#{kerberosFqdn,jdbcType=VARCHAR}, #{kerberosKeytabConf,jdbcType=VARCHAR}, #{kerberosKeytabUser,jdbcType=VARCHAR}, #{kerberosSparkJaasConf,jdbcType=VARCHAR}, #{hdfsHttpPath,jdbcType=VARCHAR},
#{hdfsSyncingPath,jdbcType=VARCHAR}, #{hdfsUserName,jdbcType=VARCHAR}, #{kafkaConectorUrl,jdbcType=VARCHAR}, #{kafkaSchemaRegisterUrl,jdbcType=VARCHAR}, #{kafkaBootstrapServers,jdbcType=VARCHAR},
#{azkabanExectorShellExec,jdbcType=VARCHAR}, #{azkabanExectorSqlExec,jdbcType=VARCHAR}, #{azkabanExectorXmlExec,jdbcType=VARCHAR}, #{azkabanExectorSqlPath,jdbcType=VARCHAR}, #{azkabanExectorShellPath,jdbcType=VARCHAR},
#{azkabanLocalTaskFilePath,jdbcType=VARCHAR}, #{azkabanExectorShellExportData,jdbcType=VARCHAR}, #{azkabanMonitorUrl,jdbcType=VARCHAR}, #{atlasMonitorUrl,jdbcType=VARCHAR}, #{remark,jdbcType=VARCHAR},
#{dataStatus,jdbcType=CHAR}, #{createUserId,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, #{updateUserId,jdbcType=INTEGER}, #{updateTime,jdbcType=TIMESTAMP}
#{azkabanLocalTaskFilePath,jdbcType=VARCHAR}, #{azkabanExectorShellExportData,jdbcType=VARCHAR}, #{azkabanMonitorUrl,jdbcType=VARCHAR}, #{atlasMonitorUrl,jdbcType=VARCHAR}, #{shellCmdServer,jdbcType=VARCHAR},
#{shellCmdUser,jdbcType=VARCHAR}, #{shellCmdPassword,jdbcType=VARCHAR}, #{shellSftpPort,jdbcType=INTEGER}, #{remark,jdbcType=VARCHAR}, #{dataStatus,jdbcType=CHAR},
#{createUserId,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, #{updateUserId,jdbcType=INTEGER}, #{updateTime,jdbcType=TIMESTAMP}
)
</insert>
......@@ -314,8 +345,9 @@
kerberos_fqdn, kerberos_keytab_conf, kerberos_keytab_user, kerberos_spark_jaas_conf, hdfs_http_path,
hdfs_syncing_path, hdfs_user_name, kafka_conector_url, kafka_schema_register_url, kafka_bootstrap_servers,
azkaban_exector_shell_exec, azkaban_exector_sql_exec, azkaban_exector_xml_exec, azkaban_exector_sql_path, azkaban_exector_shell_path,
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, remark,
data_status, create_user_id, create_time, update_user_id, update_time
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, shell_cmd_server,
shell_cmd_user, shell_cmd_password, shell_sftp_port, remark, data_status,
create_user_id, create_time, update_user_id, update_time
)
values
<foreach collection="list" item="item" separator=",">
......@@ -324,8 +356,9 @@
#{item.kerberosFqdn,jdbcType=VARCHAR}, #{item.kerberosKeytabConf,jdbcType=VARCHAR}, #{item.kerberosKeytabUser,jdbcType=VARCHAR}, #{item.kerberosSparkJaasConf,jdbcType=VARCHAR}, #{item.hdfsHttpPath,jdbcType=VARCHAR},
#{item.hdfsSyncingPath,jdbcType=VARCHAR}, #{item.hdfsUserName,jdbcType=VARCHAR}, #{item.kafkaConectorUrl,jdbcType=VARCHAR}, #{item.kafkaSchemaRegisterUrl,jdbcType=VARCHAR}, #{item.kafkaBootstrapServers,jdbcType=VARCHAR},
#{item.azkabanExectorShellExec,jdbcType=VARCHAR}, #{item.azkabanExectorSqlExec,jdbcType=VARCHAR}, #{item.azkabanExectorXmlExec,jdbcType=VARCHAR}, #{item.azkabanExectorSqlPath,jdbcType=VARCHAR}, #{item.azkabanExectorShellPath,jdbcType=VARCHAR},
#{item.azkabanLocalTaskFilePath,jdbcType=VARCHAR}, #{item.azkabanExectorShellExportData,jdbcType=VARCHAR}, #{item.azkabanMonitorUrl,jdbcType=VARCHAR}, #{item.atlasMonitorUrl,jdbcType=VARCHAR}, #{item.remark,jdbcType=VARCHAR},
#{item.dataStatus,jdbcType=CHAR}, #{item.createUserId,jdbcType=INTEGER}, #{item.createTime,jdbcType=TIMESTAMP}, #{item.updateUserId,jdbcType=INTEGER}, #{item.updateTime,jdbcType=TIMESTAMP}
#{item.azkabanLocalTaskFilePath,jdbcType=VARCHAR}, #{item.azkabanExectorShellExportData,jdbcType=VARCHAR}, #{item.azkabanMonitorUrl,jdbcType=VARCHAR}, #{item.atlasMonitorUrl,jdbcType=VARCHAR}, #{item.shellCmdServer,jdbcType=VARCHAR},
#{item.shellCmdUser,jdbcType=VARCHAR}, #{item.shellCmdPassword,jdbcType=VARCHAR}, #{item.shellSftpPort,jdbcType=INTEGER}, #{item.remark,jdbcType=VARCHAR}, #{item.dataStatus,jdbcType=CHAR},
#{item.createUserId,jdbcType=INTEGER}, #{item.createTime,jdbcType=TIMESTAMP}, #{item.updateUserId,jdbcType=INTEGER}, #{item.updateTime,jdbcType=TIMESTAMP}
)
</foreach>
</insert>
......@@ -406,6 +439,18 @@
<if test="atlasMonitorUrl != null" >
atlas_monitor_url,
</if>
<if test="shellCmdServer != null" >
shell_cmd_server,
</if>
<if test="shellCmdUser != null" >
shell_cmd_user,
</if>
<if test="shellCmdPassword != null" >
shell_cmd_password,
</if>
<if test="shellSftpPort != null" >
shell_sftp_port,
</if>
<if test="remark != null" >
remark,
</if>
......@@ -498,6 +543,18 @@
<if test="atlasMonitorUrl != null" >
#{atlasMonitorUrl,jdbcType=VARCHAR},
</if>
<if test="shellCmdServer != null" >
#{shellCmdServer,jdbcType=VARCHAR},
</if>
<if test="shellCmdUser != null" >
#{shellCmdUser,jdbcType=VARCHAR},
</if>
<if test="shellCmdPassword != null" >
#{shellCmdPassword,jdbcType=VARCHAR},
</if>
<if test="shellSftpPort != null" >
#{shellSftpPort,jdbcType=INTEGER},
</if>
<if test="remark != null" >
#{remark,jdbcType=VARCHAR},
</if>
......@@ -546,6 +603,10 @@
azkaban_exector_shell_export_data = #{azkabanExectorShellExportData,jdbcType=VARCHAR},
azkaban_monitor_url = #{azkabanMonitorUrl,jdbcType=VARCHAR},
atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR},
shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR},
shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR},
shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR},
shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER},
remark = #{remark,jdbcType=VARCHAR},
data_status = #{dataStatus,jdbcType=CHAR},
create_user_id = #{createUserId,jdbcType=INTEGER},
......@@ -631,6 +692,18 @@
<if test="atlasMonitorUrl != null" >
atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR},
</if>
<if test="shellCmdServer != null" >
shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR},
</if>
<if test="shellCmdUser != null" >
shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR},
</if>
<if test="shellCmdPassword != null" >
shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR},
</if>
<if test="shellSftpPort != null" >
shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER},
</if>
<if test="remark != null" >
remark = #{remark,jdbcType=VARCHAR},
</if>
......@@ -731,6 +804,18 @@
<if test="atlasMonitorUrl != null" >
AND atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR}
</if>
<if test="shellCmdServer != null" >
AND shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR}
</if>
<if test="shellCmdUser != null" >
AND shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR}
</if>
<if test="shellCmdPassword != null" >
AND shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR}
</if>
<if test="shellSftpPort != null" >
AND shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER}
</if>
<if test="remark != null" >
AND remark = #{remark,jdbcType=VARCHAR}
</if>
......@@ -755,7 +840,6 @@
<if test="updateTimeEnd != null" >
AND update_time <![CDATA[ <= ]]> #{updateTimeEnd,jdbcType=TIMESTAMP}
</if>
AND data_status='1'
</where>
</select>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment