Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
b7f8dfec
Commit
b7f8dfec
authored
Feb 18, 2021
by
sml
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
冲突解决
parent
52021917
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
499 additions
and
25 deletions
+499
-25
DmpDevelopTaskService.java
...ava/com/jz/dmp/modules/service/DmpDevelopTaskService.java
+74
-3
FlowService.java
src/main/java/com/jz/dmp/modules/service/FlowService.java
+12
-0
DmpDevelopTaskHistoryServiceImpl.java
...odules/service/impl/DmpDevelopTaskHistoryServiceImpl.java
+0
-6
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+366
-15
DmpNavigationTreeServiceImpl.java
...mp/modules/service/impl/DmpNavigationTreeServiceImpl.java
+8
-1
FlowServiceImpl.java
...java/com/jz/dmp/modules/service/impl/FlowServiceImpl.java
+39
-0
No files found.
src/main/java/com/jz/dmp/modules/service/DmpDevelopTaskService.java
View file @
b7f8dfec
...
...
@@ -78,18 +78,34 @@ public interface DmpDevelopTaskService {
* @throws Exception
*/
public
BaseBeanResponse
<
DmpDevelopTask
>
add
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**树ID查询任务开发
* @param treeId
* @param httpRequest
* @return
* @throws Exception
*/
public
BaseBeanResponse
<
DmpDevelopTaskDto
>
findByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**修改任务开发
* @param dmpDevelopTask
* @param httpRequest
* @return
* @throws Exception
*/
public
BaseBeanResponse
<
DmpDevelopTask
>
edit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: flowSubmit
* @Description: TODO(
工作任务流保存提交
)
* @param @param
flowPro
* @Description: TODO(
任务流程发布到azkaban
)
* @param @param
treeId
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
flowSubmit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
public
BaseResponse
flowSubmit
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: getExecXmlFileName
...
...
@@ -102,5 +118,60 @@ public interface DmpDevelopTaskService {
*/
public
String
getExecXmlFileName
(
Long
syncTaskTreeId
)
throws
Exception
;
/**
* @Title: taskAzkabanRun
* @Description: TODO(运行任务)
* @param @param treeId
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
taskAzkabanRun
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: taskAzkabanStop
* @Description: TODO(停止任务)
* @param @param treeId
* @param @param httpRequest
* @param @return 参数
* @return BaseBeanResponse<String> 返回类型
* @throws
*/
public
BaseBeanResponse
<
String
>
taskAzkabanStop
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: taskPublish
* @Description: TODO(SHELL/SQL发布接口)
* @param @param treeId
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
taskPublish
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: softDeleteByTreeId
* @Description: TODO(软删除任务)
* @param @param treeId
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
softDeleteByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* 获取执行实例的日志详情
*
* @return
* @author Bellamy
* @since 2021-02-03
*/
JsonResult
queryExamplesLogByExecId
(
String
execId
)
throws
Exception
;
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/FlowService.java
View file @
b7f8dfec
...
...
@@ -2,6 +2,7 @@ package com.jz.dmp.modules.service;
import
com.jz.common.bean.BaseResponse
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro
;
import
com.jz.dmp.modules.model.DmpDevelopTask
;
/**
* @ClassName: FlowService
...
...
@@ -22,5 +23,16 @@ public interface FlowService {
* @throws
*/
public
BaseResponse
publishFlow
(
FlowPro
flowPro
)
throws
Exception
;
/**
* @Title: deleteAzkabanFlow
* @Description: TODO(删除azkanba的任务)
* @param @param developTask
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
deleteAzkabanFlow
(
DmpDevelopTask
developTask
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskHistoryServiceImpl.java
View file @
b7f8dfec
...
...
@@ -354,12 +354,6 @@ public class DmpDevelopTaskHistoryServiceImpl extends BaseService implements Dmp
// 创建时间
dmpDevelopTaskHistory
.
setCreateTime
(
new
Date
());
//版本设置
//获取已存在版本
String
maxVersion
=
dmpDevelopTaskHistoryMapper
.
getMaxVersionByTaskId
(
dmpDevelopTaskHistory
.
getTaskId
());
String
version
=
CodeGeneratorUtils
.
generatorNextTaskVesion
(
maxVersion
);
dmpDevelopTaskHistory
.
setVersion
(
version
);
dmpDevelopTaskHistoryMapper
.
insertSelective
(
dmpDevelopTaskHistory
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
b7f8dfec
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
java.io.File
;
import
java.io.IOException
;
import
java.io.UnsupportedEncodingException
;
import
java.util.ArrayList
;
...
...
@@ -12,12 +13,17 @@ import java.util.regex.Pattern;
import
javax.servlet.http.HttpServletRequest
;
import
com.alibaba.fastjson.JSONObject
;
import
com.mysql.jdbc.Blob
;
import
org.apache.tomcat.jni.Mmap
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.aop.ThrowsAdvice
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.util.CollectionUtils
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
...
...
@@ -33,10 +39,14 @@ import com.jz.common.constant.StatuConstant;
import
com.jz.common.enums.ModuleLogEnum
;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.common.persistence.BaseService
;
import
com.jz.common.utils.AzkabanApiUtils2
;
import
com.jz.common.utils.CodeGeneratorUtils
;
import
com.jz.common.utils.DateUtils
;
import
com.jz.common.utils.FileUtils
;
import
com.jz.common.utils.GZIPUtils
;
import
com.jz.common.utils.JsonMapper
;
import
com.jz.common.utils.StringUtils
;
import
com.jz.common.utils.ZipUtils
;
import
com.jz.common.utils.web.XmlUtils
;
import
com.jz.dmp.agent.DmpAgentResult
;
import
com.jz.dmp.azkaban.dao.ExecutionFlowsMapper
;
...
...
@@ -49,15 +59,18 @@ import com.jz.dmp.modules.controller.dataOperation.bean.DataDevExamplesListDto;
import
com.jz.dmp.modules.controller.dataOperation.bean.DataDevExamplesListReq
;
import
com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListDto
;
import
com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListReq
;
import
com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto
;
import
com.jz.dmp.modules.dao.DmpDevelopTaskDao
;
import
com.jz.dmp.modules.dao.DmpNavigationTreeDao
;
import
com.jz.dmp.modules.dao.DmpProjectDao
;
import
com.jz.dmp.modules.dao.DmpSyncingDatasourceTypeDao
;
import
com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper
;
import
com.jz.dmp.modules.model.DmpAgentDatasourceInfo
;
import
com.jz.dmp.modules.model.DmpDevelopTask
;
import
com.jz.dmp.modules.model.DmpDevelopTaskHistory
;
import
com.jz.dmp.modules.model.DmpModuleOperateLog
;
import
com.jz.dmp.modules.model.DmpNavigationTree
;
import
com.jz.dmp.modules.model.DmpProjectConfigInfo
;
import
com.jz.dmp.modules.model.DmpProjectSystemInfo
;
import
com.jz.dmp.modules.model.DmpSyncingDatasource
;
import
com.jz.dmp.modules.model.DmpSyncingDatasourceType
;
...
...
@@ -67,6 +80,7 @@ import com.jz.dmp.modules.service.DmpModuleOperateLogService;
import
com.jz.dmp.modules.service.DmpSyncingDatasourceService
;
import
com.jz.dmp.modules.service.FlowService
;
import
com.jz.dmp.modules.service.OfflineSynchService
;
import
com.jz.dmp.modules.service.projconfig.DmpProjectConfigInfoService
;
/**
* 任务开发(DmpDevelopTask)表服务实现类
...
...
@@ -115,6 +129,12 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
@Autowired
private
DmpDevelopTaskHistoryService
dmpDevelopTaskHistoryService
;
@Autowired
private
DmpProjectConfigInfoMapper
dmpProjectConfigInfoMapper
;
@Autowired
private
RedisTemplate
redisTemplate
;
/**
* 添加保存dmp数据(包含校验数据)
...
...
@@ -1096,7 +1116,26 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
String
taskName
=
dmpDevelopTask
.
getName
();
if
(
StringUtils
.
isEmpty
(
taskName
))
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseBeanResponse
.
setMessage
(
"工作流名称不能为空"
);
baseBeanResponse
.
setMessage
(
"任务名称不能为空"
);
return
baseBeanResponse
;
}
//任务类型校验
String
taskType
=
dmpDevelopTask
.
getTaskType
();
String
treeType
=
""
;
if
(
StringUtils
.
isEmpty
(
taskType
))
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseBeanResponse
.
setMessage
(
"任务类型不能为空"
);
return
baseBeanResponse
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVELOP
))
{
treeType
=
CommConstant
.
TREE_TYPE_DEVELOP
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSHELL
))
{
treeType
=
CommConstant
.
TREE_TYPE_DEVSHELL
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSQL
))
{
treeType
=
CommConstant
.
TREE_TYPE_DEVSQL
;
}
else
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseBeanResponse
.
setMessage
(
"任务类型错误"
);
return
baseBeanResponse
;
}
...
...
@@ -1106,7 +1145,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
tree
.
setParentId
(
dmpDevelopTask
.
getParentId
());
tree
.
setProjectId
(
dmpDevelopTask
.
getProjectId
());
tree
.
setCategory
(
CommConstant
.
TREE_DEVELOP_TASK
);
tree
.
setType
(
"W"
);
tree
.
setType
(
treeType
);
tree
.
setIsLevel
(
"1"
);
tree
.
setIsEnable
(
"1"
);
tree
.
setDataStatus
(
"1"
);
...
...
@@ -1115,13 +1154,13 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
int
cnt
=
dmpNavigationTreeDao
.
countTreeByName
(
tree
);
if
(
cnt
>
0
)
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_DATA_EXISTED
);
baseBeanResponse
.
setMessage
(
"当前项目已存在同名的
工作流
名称"
);
baseBeanResponse
.
setMessage
(
"当前项目已存在同名的
任务
名称"
);
return
baseBeanResponse
;
}
dmpNavigationTreeDao
.
insert
(
tree
);
//将新增任务保存到数据库
dmpDevelopTask
.
setTaskType
(
"1"
);
dmpDevelopTask
.
setTaskType
(
taskType
);
dmpDevelopTask
.
setType
(
"W"
);
dmpDevelopTask
.
setIsSubmit
(
"0"
);
dmpDevelopTask
.
setScheduleType
(
"2"
);
...
...
@@ -1154,6 +1193,11 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
// 创建时间
dmpDevelopTask
.
setCreateTime
(
new
Date
());
//版本设置
//获取已存在版本
String
version
=
CodeGeneratorUtils
.
generatorNextTaskVesion
(
null
);
dmpDevelopTask
.
setVersion
(
version
);
dmpDevelopTaskDao
.
insert
(
dmpDevelopTask
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
...
...
@@ -1162,26 +1206,87 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
return
baseBeanResponse
;
}
/*
* (non-Javadoc)
*
* @see
* com.ycxc.vmts.service.DmpDevelopTaskService#findByTreeId(com.ycxc.vmts.controller.
* bean.DmpDevelopTaskRequest, javax.servlet.http.HttpServletRequest)
*/
@Override
public
BaseBeanResponse
<
DmpDevelopTaskDto
>
findByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseBeanResponse
<
DmpDevelopTaskDto
>
baseBeanResponse
=
new
BaseBeanResponse
<>();
DmpDevelopTask
dmpDevelopTask
=
dmpDevelopTaskDao
.
selectTaskInfoByParam
(
treeId
);
DmpDevelopTaskDto
dmpDevelopTaskDto
=
MyDmpDevelopTaskConverter
.
INSTANCE
().
domain2dto
(
dmpDevelopTask
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseBeanResponse
.
setMessage
(
"查询成功"
);
baseBeanResponse
.
setData
(
dmpDevelopTaskDto
);
return
baseBeanResponse
;
}
/**
*任务
流保存及提交
*任务
修改
*/
@Override
public
BaseResponse
flowSubmit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
BaseBeanResponse
<
DmpDevelopTask
>
edit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseBeanResponse
<
DmpDevelopTask
>
baseBeanResponse
=
new
BaseBeanResponse
<
DmpDevelopTask
>();
BaseResponse
baseResponse
=
new
BaseResponse
();
// 修改人
dmpDevelopTask
.
setUpdateUserId
(
getHttpRequestUserId
(
httpRequest
).
toString
());
// 修改时间
dmpDevelopTask
.
setUpdateTime
(
new
Date
());
// 版本设置
// 获取已存在版本
DmpDevelopTask
dmpDevelopTaskDb
=
dmpDevelopTaskDao
.
selectTaskById
(
dmpDevelopTask
.
getId
().
toString
());
String
version
=
CodeGeneratorUtils
.
generatorNextTaskVesion
(
dmpDevelopTaskDb
.
getVersion
());
dmpDevelopTask
.
setVersion
(
version
);
FlowPro
flowPro
=
MyDmpDevelopTaskConverter
.
INSTANCE
().
task2flowpro
(
dmpDevelopTask
);
dmpDevelopTaskDao
.
update
(
dmpDevelopTask
);
//参数校验
Long
treeId
=
flowPro
.
getTreeId
();
if
(
treeId
==
null
)
{
DmpDevelopTask
dmpDevelopTaskDb2
=
dmpDevelopTaskDao
.
get
(
dmpDevelopTask
.
getTreeId
().
longValue
());
DmpDevelopTaskHistory
dmpDevelopTaskHistory
=
MyDmpDevelopTaskHistoryConverter
.
INSTANCE
().
task2history
(
dmpDevelopTaskDb2
);
// 保存版本为版本数据
dmpDevelopTaskHistoryService
.
add
(
dmpDevelopTaskHistory
,
httpRequest
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseBeanResponse
.
setMessage
(
"修改成功"
);
baseBeanResponse
.
setData
(
dmpDevelopTaskDb
);
return
baseBeanResponse
;
}
/**
*任务流程发布
*/
@Override
public
BaseResponse
flowSubmit
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
// 参数校验
if
(
treeId
==
null
)
{
baseResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseResponse
.
setMessage
(
"treeId不能为空"
);
return
baseResponse
;
}
DmpDevelopTask
dmpDevelopTask
=
dmpDevelopTaskDao
.
get
(
treeId
);
FlowPro
flowPro
=
MyDmpDevelopTaskConverter
.
INSTANCE
().
task2flowpro
(
dmpDevelopTask
);
DmpNavigationTree
dmpNavigationTree
=
dmpNavigationTreeDao
.
queryById
(
treeId
.
intValue
());
Long
projectId
=
Long
.
parseLong
(
dmpNavigationTree
.
getProjectId
().
toString
());
flowPro
.
setPublishedToProjectId
(
projectId
);
...
...
@@ -1203,12 +1308,11 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
updateDevelopTask
.
setIsSubmit
(
flowPro
.
getIsSubmit
());
updateDevelopTask
.
setIsGziped
(
1
);
dmpDevelopTaskDao
.
update
(
updateDevelopTask
);
DmpDevelopTaskHistory
dmpDevelopTaskHistory
=
MyDmpDevelopTaskHistoryConverter
.
INSTANCE
().
task2history
(
updateDevelopTask
);
//设置发布版本
updateDevelopTask
.
setPublishVersion
(
queryDmpDevelopTask
.
getVersion
());
dmpDevelopTaskDao
.
update
(
updateDevelopTask
);
//保存版本为版本数据
dmpDevelopTaskHistoryService
.
add
(
dmpDevelopTaskHistory
,
httpRequest
);
}
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
...
...
@@ -1232,4 +1336,251 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
return
xmlFileName
;
}
/**
*运行任务
*/
@Override
public
BaseResponse
taskAzkabanRun
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
DmpDevelopTask
developTask
=
dmpDevelopTaskDao
.
get
(
treeId
);
//先发布任务
boolean
flag
=
publishAndExecute
(
developTask
);
if
(
flag
)
{
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseResponse
.
setMessage
(
"立即运行成功"
);
}
else
{
baseResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
baseResponse
.
setMessage
(
"立即运行失败"
);
}
return
baseResponse
;
}
/**
* 发布流程
*/
private
boolean
publishAndExecute
(
DmpDevelopTask
dmpDevelopTask
)
throws
Exception
{
Integer
taskId
=
dmpDevelopTask
.
getId
();
//任务id
Integer
projectId
=
dmpDevelopTask
.
getProjectId
();
//项目id
String
treeName
=
dmpDevelopTask
.
getName
();
//任务流程名称
//获取项目配置信息
Map
<
String
,
Object
>
param
=
new
HashMap
<
String
,
Object
>();
param
.
put
(
"projectId"
,
projectId
);
List
<
DmpProjectConfigInfoDto
>
list
=
dmpProjectConfigInfoMapper
.
findList
(
param
);
if
(
CollectionUtils
.
isEmpty
(
list
))
{
throw
new
RuntimeException
(
"项目没有设置配置信息,请联系管理员!"
);
}
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
=
list
.
get
(
0
);
//校验任务类型并配置参数
String
taskType
=
dmpDevelopTask
.
getTaskType
();
String
azkabanExectorTaskExec
=
""
;
String
taskAlias
=
""
;
if
(
StringUtils
.
isEmpty
(
taskType
))
{
throw
new
RuntimeException
(
"任务的任务类型为空!"
);
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSHELL
))
{
azkabanExectorTaskExec
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanExectorShellExec
();
//执行shell任务命令
taskAlias
=
"shell"
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSQL
))
{
azkabanExectorTaskExec
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanExectorSqlExec
();
//执行sql任务命令
taskAlias
=
"sql"
;
}
else
{
throw
new
RuntimeException
(
"该任务类型不能调用此方法发布,请检查任务类型!"
);
}
String
azkabanLocalTaskFilePath
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanLocalTaskFilePath
();
//文件路径
String
azkabanMonitorUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanMonitorUrl
();
//AZKABAN WEB服务地址
String
azkabanJobCommand
=
"command="
+
azkabanExectorTaskExec
+
" "
+
projectId
+
" ${azkaban.flow.flowid} ${azkaban.job.id} "
+
treeName
;
/**
* 当前任务生成文件存放根路径
*/
String
localTaskPath
=
azkabanLocalTaskFilePath
+
"/"
+
projectId
+
"/local/"
+
taskId
;
File
localTaskFile
=
new
File
(
localTaskPath
);
if
(!
localTaskFile
.
exists
())
{
localTaskFile
.
mkdirs
();
}
String
localTaskSourcePath
=
localTaskPath
+
"/source/"
;
File
localTaskSourceFile
=
new
File
(
localTaskSourcePath
);
if
(!
localTaskSourceFile
.
exists
())
{
localTaskSourceFile
.
mkdirs
();
}
String
localTaskExecArgsPath
=
localTaskPath
+
"/execArgs/"
;
File
localTaskExecArgsFile
=
new
File
(
localTaskExecArgsPath
);
if
(!
localTaskExecArgsFile
.
exists
())
{
localTaskExecArgsFile
.
mkdirs
();
}
String
localTaskZipPath
=
localTaskPath
+
"/target/"
;
File
localTaskZipFile
=
new
File
(
localTaskZipPath
);
if
(!
localTaskZipFile
.
exists
())
{
localTaskZipFile
.
mkdirs
();
}
//先删除上次创建的文件 ---每次都会新建
FileUtils
.
deleteDirectory
(
localTaskPath
);
List
<
String
>
contents
=
new
ArrayList
<>();
// 子流程类型
contents
.
add
(
"type=command"
);
contents
.
add
(
azkabanJobCommand
);
// 生成job文件
String
jobFileAbsolutePath
=
localTaskSourcePath
+
treeName
+
".job"
;
FileUtils
.
write
(
jobFileAbsolutePath
,
contents
);
String
localZipTargetFileName
=
treeName
+
".zip"
;
ZipUtils
.
zip
(
localTaskSourcePath
,
localTaskZipPath
,
localZipTargetFileName
);
//上传到azkaban todo
//上次zip包到azkaban
String
localTaskZipAbsolutePath
=
localTaskZipPath
+
"/"
+
localZipTargetFileName
;
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
,
redisTemplate
);
return
azkabanApiUtils
.
loginCreateProjectuploadZipAndExecute
(
"jz_localflow_"
+
taskAlias
+
"_"
+
projectId
,
"local_"
+
taskAlias
+
"_project"
,
localTaskZipAbsolutePath
,
treeName
);
}
/**
*停止任务
*/
@Override
public
BaseBeanResponse
<
String
>
taskAzkabanStop
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
// TODO Auto-generated method stub
BaseBeanResponse
<
String
>
baseBeanResponse
=
new
BaseBeanResponse
<
String
>();
DmpDevelopTask
dmpDevelopTask
=
dmpDevelopTaskDao
.
get
(
treeId
);
if
(
dmpDevelopTask
==
null
)
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseBeanResponse
.
setMessage
(
"任务不存在"
);
return
baseBeanResponse
;
}
Integer
projectId
=
dmpDevelopTask
.
getProjectId
();
String
treeName
=
dmpDevelopTask
.
getName
();
//获取项目配置信息
Map
<
String
,
Object
>
param
=
new
HashMap
<
String
,
Object
>();
List
<
DmpProjectConfigInfoDto
>
list
=
dmpProjectConfigInfoMapper
.
findList
(
param
);
if
(
CollectionUtils
.
isEmpty
(
list
))
{
throw
new
RuntimeException
(
"项目没有设置配置信息,请联系管理员!"
);
}
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
=
list
.
get
(
0
);
//校验任务类型并配置参数
String
taskType
=
dmpDevelopTask
.
getTaskType
();
String
taskAlias
=
""
;
if
(
StringUtils
.
isEmpty
(
taskType
))
{
throw
new
RuntimeException
(
"任务的任务类型为空!"
);
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSHELL
))
{
taskAlias
=
"shell"
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSQL
))
{
taskAlias
=
"sql"
;
}
else
{
throw
new
RuntimeException
(
"该任务类型不能调用此方法停止,请检查任务类型!"
);
}
String
azkabanMonitorUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanMonitorUrl
();
//AZKABAN WEB服务地址
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
,
redisTemplate
);
String
exeIdsStr
=
azkabanApiUtils
.
stopFlow
(
"jz_localflow_"
+
taskAlias
+
"_"
+
projectId
,
treeName
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseBeanResponse
.
setMessage
(
"停止成功"
);
baseBeanResponse
.
setData
(
exeIdsStr
);
return
baseBeanResponse
;
}
/**
*SHELL/SQL版本发布
*/
@Override
public
BaseResponse
taskPublish
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
DmpDevelopTask
dmpDevelopTask
=
dmpDevelopTaskDao
.
get
(
treeId
);
// 修改人
dmpDevelopTask
.
setUpdateUserId
(
getHttpRequestUserId
(
httpRequest
).
toString
());
// 修改时间
dmpDevelopTask
.
setUpdateTime
(
new
Date
());
//设置发布版本
dmpDevelopTask
.
setPublishVersion
(
dmpDevelopTask
.
getVersion
());
dmpDevelopTaskDao
.
update
(
dmpDevelopTask
);
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseResponse
.
setMessage
(
"发布成功"
);
return
baseResponse
;
}
/**
*软删除任务
*/
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
BaseResponse
softDeleteByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
DmpDevelopTask
developTask
=
dmpDevelopTaskDao
.
get
(
treeId
.
longValue
());
String
taskType
=
developTask
.
getTaskType
();
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVELOP
))
{
//取消发布的任务
BaseResponse
baseResponseAzkaban
=
flowService
.
deleteAzkabanFlow
(
developTask
);
if
(
baseResponseAzkaban
.
getCode
().
equals
(
StatuConstant
.
FAILURE_CODE
))
{
throw
new
RuntimeException
(
"azkaban取消发布任务失败"
);
}
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSHELL
)
||
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSQL
))
{
//什么也不做
}
else
{
baseResponse
.
setCode
(
StatuConstant
.
CODE_DATA_NOTMEET
);
baseResponse
.
setMessage
(
"任务类型不适合调用该方法"
);
return
baseResponse
;
}
//软删除树
DmpNavigationTree
dmpNavigationTree
=
new
DmpNavigationTree
();
dmpNavigationTree
.
setId
(
treeId
);
dmpNavigationTree
.
setDataStatus
(
"0"
);
dmpNavigationTreeDao
.
update
(
dmpNavigationTree
);
//软删除任务
developTask
.
setDataStatus
(
"0"
);
dmpDevelopTaskDao
.
update
(
developTask
);
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseResponse
.
setMessage
(
"软删除成功"
);
return
baseResponse
;
}
/**
* 获取执行实例的日志详情
*
* @return
* @author Bellamy
* @since 2021-02-03
*/
@Override
public
JsonResult
queryExamplesLogByExecId
(
String
execId
)
throws
Exception
{
List
<
Map
>
list
=
executionFlowsMapper
.
queryExamplesLogByExecId
(
execId
);
if
(
list
.
size
()
>
0
&&
list
!=
null
)
{
list
.
forEach
(
map
->
{
//map.put("log", map.get("log"));
});
}
return
JsonResult
.
ok
(
list
);
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpNavigationTreeServiceImpl.java
View file @
b7f8dfec
...
...
@@ -75,7 +75,14 @@ public class DmpNavigationTreeServiceImpl extends BaseService implements DmpNavi
dmpNavigationTree
.
setCreateUserId
(
getHttpRequestUserId
(
null
).
toString
());
// 创建时间
dmpNavigationTree
.
setCreateTime
(
new
Date
());
dmpNavigationTree
.
setIsLevel
(
"0"
);
//dmpNavigationTree.setIsLevel("0");
DmpNavigationTree
dmpNavigationTreeParam
=
new
DmpNavigationTree
();
//查询未删除数据
dmpNavigationTreeParam
.
setDataStatus
(
"1"
);
List
<
DmpNavigationTree
>
list
=
dmpNavigationTreeDao
.
queryAll
(
dmpNavigationTreeParam
);
this
.
dmpNavigationTreeDao
.
insertSelective
(
dmpNavigationTree
);
return
dmpNavigationTree
;
}
...
...
src/main/java/com/jz/dmp/modules/service/impl/FlowServiceImpl.java
View file @
b7f8dfec
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.util.CollectionUtils
;
import
com.google.common.io.BaseEncoding
;
import
com.jz.common.bean.BaseBeanResponse
;
import
com.jz.common.bean.BaseResponse
;
import
com.jz.common.constant.CommConstant
;
import
com.jz.common.constant.StatuConstant
;
import
com.jz.common.utils.AzkabanApiUtils2
;
import
com.jz.common.utils.FlowParseTool
;
import
com.jz.common.utils.GZIPUtils
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNodeChangeInfo
;
...
...
@@ -19,6 +25,7 @@ import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto;
import
com.jz.dmp.modules.dao.DmpProjectDao
;
import
com.jz.dmp.modules.dao.DmpWorkFlowSubmitDetailsDao
;
import
com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper
;
import
com.jz.dmp.modules.model.DmpDevelopTask
;
import
com.jz.dmp.modules.model.DmpProject
;
import
com.jz.dmp.modules.model.DmpProjectSystemInfo
;
import
com.jz.dmp.modules.model.DmpWorkFlowSubmitDetails
;
...
...
@@ -59,6 +66,9 @@ public class FlowServiceImpl implements FlowService {
@Autowired
private
DmpProjectConfigInfoService
dmpProjectConfigInfoService
;
@Autowired
private
DmpProjectConfigInfoMapper
dmpProjectConfigInfoMapper
;
@Autowired
private
RedisTemplate
redisTemplate
;
...
...
@@ -141,4 +151,33 @@ public class FlowServiceImpl implements FlowService {
return
baseResponse
;
}
/**
*删除azkaban任务
*/
@Override
public
BaseResponse
deleteAzkabanFlow
(
DmpDevelopTask
developTask
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
Integer
projectId
=
developTask
.
getProjectId
();
//获取项目配置信息
Map
<
String
,
Object
>
param
=
new
HashMap
<
String
,
Object
>();
param
.
put
(
"projectId"
,
projectId
);
List
<
DmpProjectConfigInfoDto
>
list
=
dmpProjectConfigInfoMapper
.
findList
(
param
);
if
(
CollectionUtils
.
isEmpty
(
list
))
{
baseResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
baseResponse
.
setMessage
(
"项目没有设置配置信息,请联系管理员!"
);
return
baseResponse
;
}
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
=
list
.
get
(
0
);
String
azkabanMonitorUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanMonitorUrl
();
//AZKABAN WEB服务地址
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
,
redisTemplate
);
azkabanApiUtils
.
deleteAzkabanFlow
(
CommConstant
.
AZKABAN_PROJECTNAME_PREFIX
+
projectId
,
developTask
.
getName
());
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseResponse
.
setMessage
(
"删除成功"
);
return
baseResponse
;
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment