Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
bdf7e405
Commit
bdf7e405
authored
Feb 19, 2021
by
mcb
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dmp_dev' of
http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-service
into dmp_dev
parents
aa39ad69
4ef683ae
Changes
16
Show whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
727 additions
and
93 deletions
+727
-93
CommConstant.java
src/main/java/com/jz/common/constant/CommConstant.java
+26
-1
DmpDevelopTaskController.java
...m/jz/dmp/modules/controller/DmpDevelopTaskController.java
+135
-6
DmpNavigationTreeController.java
...z/dmp/modules/controller/DmpNavigationTreeController.java
+5
-0
MyDmpDevelopTaskHistoryConverter.java
...les/controller/bean/MyDmpDevelopTaskHistoryConverter.java
+1
-0
DmpProjectConfigInfoDto.java
...s/controller/projconfig/bean/DmpProjectConfigInfoDto.java
+8
-2
DmpDevelopTaskDao.java
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
+2
-1
DmpDevelopTask.java
src/main/java/com/jz/dmp/modules/model/DmpDevelopTask.java
+18
-4
DmpNavigationTree.java
...main/java/com/jz/dmp/modules/model/DmpNavigationTree.java
+2
-2
DmpDevelopTaskService.java
...ava/com/jz/dmp/modules/service/DmpDevelopTaskService.java
+99
-39
FlowService.java
src/main/java/com/jz/dmp/modules/service/FlowService.java
+12
-0
DmpDevelopTaskHistoryServiceImpl.java
...odules/service/impl/DmpDevelopTaskHistoryServiceImpl.java
+0
-6
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+345
-15
DmpNavigationTreeServiceImpl.java
...mp/modules/service/impl/DmpNavigationTreeServiceImpl.java
+8
-1
FlowServiceImpl.java
...java/com/jz/dmp/modules/service/impl/FlowServiceImpl.java
+39
-0
DmpDevelopTaskHistoryMapper.xml
...main/resources/mapper/dmp/DmpDevelopTaskHistoryMapper.xml
+2
-4
DmpDevelopTaskMapper.xml
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
+25
-12
No files found.
src/main/java/com/jz/common/constant/CommConstant.java
View file @
bdf7e405
...
...
@@ -33,8 +33,33 @@ public class CommConstant {
/***************************************************/
//task_type任务类型
public
static
final
String
TASK_TYPE_OFFLINE
=
"2"
;
public
static
final
String
TASK_TYPE_DEVELOP
=
"1"
;
//开发任务
public
static
final
String
TASK_TYPE_OFFLINE
=
"2"
;
//离线任务
public
static
final
String
TASK_TYPE_DEVSHELL
=
"3"
;
//SHELL任务
public
static
final
String
TASK_TYPE_DEVSQL
=
"4"
;
//SQL任务
/***************************************************/
//tree type(节点类型)
public
static
final
String
TREE_TYPE_OFFLINE
=
"01"
;
//离线任务
public
static
final
String
TREE_TYPE_REALTIMESYNC
=
"02"
;
//实时同步
public
static
final
String
TREE_TYPE_DEVELOP
=
"03"
;
//数据开发
public
static
final
String
TREE_TYPE_DEVSHELL
=
"04"
;
//脚本开发
public
static
final
String
TREE_TYPE_DEVSQL
=
"05"
;
//SQL开发
/***************************************************/
//子任务类型
public
static
final
String
WORK_TYPE_START
=
"start"
;
//开始任务
public
static
final
String
WORK_TYPE_SHELL
=
"shell"
;
//shell任务
public
static
final
String
WORK_TYPE_SQL
=
"sql"
;
//sql任务
public
static
final
String
WORK_TYPE_SYNC
=
"sync"
;
//离线任务
public
static
final
String
WORK_TYPE_SUBPROCESS
=
"subprocess"
;
//子流程任务
public
static
final
String
WORK_TYPE_FTP
=
"ftp"
;
//ftp下载任务
public
static
final
String
WORK_TYPE_UNZIPFILE
=
"unzipFile"
;
//解压文件任务
public
static
final
String
WORK_TYPE_DOCTRANS
=
"docTrans"
;
//文件转码任务
public
static
final
String
WORK_TYPE_HDFS
=
"hdfs"
;
//hdfs上传任务
public
static
final
String
WORK_TYPE_STOP
=
"stop"
;
//停止任务
/***************************************************/
//azkaban相关常量
public
static
final
String
AZKABAN_PROJECTNAME_PREFIX
=
"jz_workflow_new_"
;
//azkaban项目名称前缀
}
src/main/java/com/jz/dmp/modules/controller/DmpDevelopTaskController.java
View file @
bdf7e405
...
...
@@ -6,8 +6,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.web.bind.annotation.RequestBody
;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.RequestMethod
;
import
org.springframework.web.bind.annotation.RequestParam
;
import
org.springframework.web.bind.annotation.RestController
;
import
com.jz.common.annotation.MethodCallLogPrint
;
import
com.jz.common.bean.BaseBeanResponse
;
import
com.jz.common.bean.BaseResponse
;
import
com.jz.common.bean.PageInfoResponse
;
...
...
@@ -18,6 +20,7 @@ import com.jz.dmp.modules.model.DmpDevelopTask;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiImplicitParam
;
import
io.swagger.annotations.ApiOperation
;
/**
...
...
@@ -74,19 +77,145 @@ public class DmpDevelopTaskController {
return
baseBeanResponse
;
}
/**
任务流程保存提交
/**
treeId查询任务开发
* @param dmpDevelopTaskRequest
* @return
*/
@RequestMapping
(
method
=
RequestMethod
.
POST
,
value
=
"/flowSubmit"
)
@ApiOperation
(
value
=
"任务流程保存提交"
,
notes
=
"任务流程保存提交"
)
public
BaseResponse
flowSubmit
(
@RequestBody
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
){
@MethodCallLogPrint
@RequestMapping
(
method
=
RequestMethod
.
GET
,
value
=
"/findByTreeId"
)
@ApiOperation
(
value
=
"treeId查询任务开发"
,
notes
=
"treeId查询任务开发"
)
@ApiImplicitParam
(
name
=
"treeId"
,
value
=
"treeId"
)
public
BaseBeanResponse
<
DmpDevelopTaskDto
>
view
(
@RequestParam
(
name
=
"treeId"
,
required
=
true
)
Integer
treeId
,
HttpServletRequest
httpRequest
){
BaseBeanResponse
<
DmpDevelopTaskDto
>
baseBeanResponse
=
new
BaseBeanResponse
<
DmpDevelopTaskDto
>();
try
{
baseBeanResponse
=
dmpDevelopTaskService
.
findByTreeId
(
treeId
,
httpRequest
);
}
catch
(
Exception
e
)
{
baseBeanResponse
.
setMessage
(
"请求失败"
);
baseBeanResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
e
.
printStackTrace
();
}
return
baseBeanResponse
;
}
/**修改任务开发
* @param dmpDevelopTaskRequest
* @return
*/
@MethodCallLogPrint
@RequestMapping
(
method
=
RequestMethod
.
POST
,
value
=
"/edit"
)
@ApiOperation
(
value
=
"修改任务开发"
,
notes
=
"修改任务开发"
)
public
BaseBeanResponse
<
DmpDevelopTask
>
edit
(
@RequestBody
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
){
BaseBeanResponse
<
DmpDevelopTask
>
baseBeanResponse
=
new
BaseBeanResponse
<
DmpDevelopTask
>();
try
{
baseBeanResponse
=
dmpDevelopTaskService
.
edit
(
dmpDevelopTask
,
httpRequest
);
}
catch
(
Exception
e
)
{
baseBeanResponse
.
setMessage
(
"修改失败"
);
baseBeanResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
e
.
printStackTrace
();
}
return
baseBeanResponse
;
}
/**任务流程发布接口
* @param dmpDevelopTaskRequest
* @return
*/
@RequestMapping
(
method
=
RequestMethod
.
GET
,
value
=
"/flowSubmit"
)
@ApiOperation
(
value
=
"任务流程发布接口"
,
notes
=
"任务流程发布接口"
)
@ApiImplicitParam
(
name
=
"treeId"
,
value
=
"发布任务树主键"
)
public
BaseResponse
flowSubmit
(
@RequestParam
(
value
=
"treeId"
,
required
=
true
)
Long
treeId
,
HttpServletRequest
httpRequest
){
BaseResponse
baseResponse
=
new
BaseResponse
();
try
{
baseResponse
=
dmpDevelopTaskService
.
flowSubmit
(
treeId
,
httpRequest
);
}
catch
(
Exception
e
)
{
baseResponse
.
setMessage
(
"任务流程发布失败"
);
baseResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
e
.
printStackTrace
();
}
return
baseResponse
;
}
/**SHELL/SQL/离线任务发布接口
* @param treeId
* @return
*/
@RequestMapping
(
method
=
RequestMethod
.
GET
,
value
=
"/taskPublish"
)
@ApiOperation
(
value
=
"SHELL/SQL/离线任务发布接口"
,
notes
=
"SHELL/SQL/离线任务发布接口"
)
@ApiImplicitParam
(
name
=
"treeId"
,
value
=
"发布任务树主键"
)
public
BaseResponse
taskPublish
(
@RequestParam
(
value
=
"treeId"
,
required
=
true
)
Long
treeId
,
HttpServletRequest
httpRequest
){
BaseResponse
baseResponse
=
new
BaseResponse
();
try
{
baseResponse
=
dmpDevelopTaskService
.
flowSubmit
(
dmpDevelopTask
,
httpRequest
);
baseResponse
=
dmpDevelopTaskService
.
taskPublish
(
treeId
,
httpRequest
);
}
catch
(
Exception
e
)
{
baseResponse
.
setMessage
(
"SHELL/SQL/离线任务发布失败"
);
baseResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
e
.
printStackTrace
();
}
return
baseResponse
;
}
/**任务立即运行接口
* @param treeId
* @return
*/
@RequestMapping
(
method
=
RequestMethod
.
GET
,
value
=
"/taskAzkabanRun"
)
@ApiOperation
(
value
=
"任务立即运行接口"
,
notes
=
"任务立即运行接口"
)
@ApiImplicitParam
(
name
=
"treeId"
,
value
=
"任务树主键"
)
public
BaseResponse
taskAzkabanRun
(
@RequestParam
(
value
=
"treeId"
,
required
=
true
)
Long
treeId
,
HttpServletRequest
httpRequest
){
BaseResponse
baseResponse
=
new
BaseResponse
();
try
{
baseResponse
=
dmpDevelopTaskService
.
taskAzkabanRun
(
treeId
,
httpRequest
);
}
catch
(
Exception
e
)
{
baseResponse
.
setMessage
(
"任务运行失败"
);
baseResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
e
.
printStackTrace
();
}
return
baseResponse
;
}
/**任务立即运行停止
* @param treeId
* @return
*/
@RequestMapping
(
method
=
RequestMethod
.
GET
,
value
=
"/taskAzkabanStop"
)
@ApiOperation
(
value
=
"任务立即运行停止接口"
,
notes
=
"任务立即运行停止接口"
)
@ApiImplicitParam
(
name
=
"treeId"
,
value
=
"任务树主键"
)
public
BaseBeanResponse
<
String
>
taskAzkabanStop
(
@RequestParam
(
value
=
"treeId"
,
required
=
true
)
Long
treeId
,
HttpServletRequest
httpRequest
){
BaseBeanResponse
<
String
>
baseBeanResponse
=
new
BaseBeanResponse
<
String
>();
try
{
baseBeanResponse
=
dmpDevelopTaskService
.
taskAzkabanStop
(
treeId
,
httpRequest
);
}
catch
(
Exception
e
)
{
baseBeanResponse
.
setMessage
(
"任务运行失败"
);
baseBeanResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
e
.
printStackTrace
();
}
return
baseBeanResponse
;
}
/**软删除任务开发
* @param id
* @return
*/
@MethodCallLogPrint
@RequestMapping
(
method
=
RequestMethod
.
GET
,
value
=
"/softDeleteByTreeId"
)
@ApiOperation
(
value
=
"软删除任务开发"
,
notes
=
"软删除任务开发"
)
@ApiImplicitParam
(
name
=
"treeId"
,
value
=
"任务开发主键"
)
public
BaseResponse
softDeleteByTreeId
(
@RequestParam
(
name
=
"treeId"
,
required
=
true
)
Integer
treeId
,
HttpServletRequest
httpRequest
){
BaseResponse
baseResponse
=
new
BaseResponse
();
try
{
baseResponse
=
dmpDevelopTaskService
.
softDeleteByTreeId
(
treeId
,
httpRequest
);
}
catch
(
Exception
e
)
{
baseResponse
.
setMessage
(
"
任务流程保存提交
失败"
);
baseResponse
.
setMessage
(
"
软删除
失败"
);
baseResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
e
.
printStackTrace
();
}
...
...
src/main/java/com/jz/dmp/modules/controller/DmpNavigationTreeController.java
View file @
bdf7e405
...
...
@@ -82,9 +82,14 @@ public class DmpNavigationTreeController {
@ApiOperation
(
value
=
"新增DMP资源导航树"
,
notes
=
"新增DMP资源导航树"
)
public
BaseBeanResponse
<
DmpNavigationTree
>
add
(
@RequestBody
DmpNavigationTree
dmpNavigationTree
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseBeanResponse
<
DmpNavigationTree
>
baseBeanResponse
=
new
BaseBeanResponse
<
DmpNavigationTree
>();
//校验参数
//
//树名称去重
DmpNavigationTreeRequest
dmpNavigationTreeRequest
=
new
DmpNavigationTreeRequest
();
dmpNavigationTreeRequest
.
setName
(
dmpNavigationTree
.
getName
());
dmpNavigationTreeRequest
.
setProjectId
(
dmpNavigationTree
.
getProjectId
());
BaseBeanResponse
<
DmpNavigationTreeDto
>
baseBeanResponseRe
=
dmpNavigationTreeService
.
findList
(
dmpNavigationTreeRequest
,
httpRequest
);
if
(
baseBeanResponseRe
.
getDatas
()
!=
null
&&
baseBeanResponseRe
.
getDatas
().
size
()
>
0
)
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
...
...
src/main/java/com/jz/dmp/modules/controller/bean/MyDmpDevelopTaskHistoryConverter.java
View file @
bdf7e405
...
...
@@ -56,6 +56,7 @@ public class MyDmpDevelopTaskHistoryConverter {
dmpDevelopTaskHistory
.
setTaskCreateTime
(
dmpDevelopTask
.
getCreateTime
());
dmpDevelopTaskHistory
.
setTaskUpdateUserId
(
dmpDevelopTask
.
getUpdateUserId
());
dmpDevelopTaskHistory
.
setTaskUpdateTime
(
dmpDevelopTask
.
getUpdateTime
());
dmpDevelopTaskHistory
.
setId
(
null
);
return
dmpDevelopTaskHistory
;
}
...
...
src/main/java/com/jz/dmp/modules/controller/projconfig/bean/DmpProjectConfigInfoDto.java
View file @
bdf7e405
...
...
@@ -8,6 +8,13 @@ import io.swagger.annotations.ApiModelProperty;
* @author ybz
*
*/
/**
* @ClassName: DmpProjectConfigInfoDto
* @Description: TODO(这里用一句话描述这个类的作用)
* @author ybz
* @date 2021年2月4日
*
*/
@ApiModel
(
value
=
"项目配置表Dto"
,
description
=
"项目配置表Dto"
)
public
class
DmpProjectConfigInfoDto
extends
DmpProjectConfigInfo
{
...
...
@@ -26,5 +33,4 @@ public class DmpProjectConfigInfoDto extends DmpProjectConfigInfo {
public
void
setDmpPublicConfigInfoDto
(
DmpPublicConfigInfoDto
dmpPublicConfigInfoDto
)
{
this
.
dmpPublicConfigInfoDto
=
dmpPublicConfigInfoDto
;
}
}
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
View file @
bdf7e405
...
...
@@ -72,4 +72,5 @@ public interface DmpDevelopTaskDao {
* @since 2021-02-01
*/
DmpDevelopTask
selectTaskById
(
@Param
(
"taskId"
)
String
taskId
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/model/DmpDevelopTask.java
View file @
bdf7e405
...
...
@@ -29,9 +29,9 @@ public class DmpDevelopTask implements Serializable {
@ApiModelProperty
(
value
=
"数据源ID"
)
private
Integer
datasourceId
;
/**
* 任务类型(1,开发任务;2,离线任务)
* 任务类型(1,开发任务;2,离线任务
;3,SHELL任务;4,SQL任务
)
*/
@ApiModelProperty
(
value
=
"任务类型(1,开发任务;2,离线任务)"
)
@ApiModelProperty
(
value
=
"任务类型(1,开发任务;2,离线任务
;3,SHELL任务;4,SQL任务
)"
)
private
String
taskType
;
/**
* 类型
...
...
@@ -75,9 +75,9 @@ public class DmpDevelopTask implements Serializable {
@JsonFormat
(
pattern
=
"yyyy-MM-dd HH:mm:ss"
)
private
Date
createTime
;
/**
*
创建
用户ID
*
修改
用户ID
*/
@ApiModelProperty
(
value
=
"
创建
用户ID"
)
@ApiModelProperty
(
value
=
"
修改
用户ID"
)
private
String
updateUserId
;
/**
* 数据更新时间
...
...
@@ -122,6 +122,12 @@ public class DmpDevelopTask implements Serializable {
@ApiModelProperty
(
value
=
"版本"
)
private
String
version
;
/**
* 发布版本
*/
@ApiModelProperty
(
value
=
"发布版本"
)
private
String
publishVersion
;
@ApiModelProperty
(
value
=
"是否压缩"
)
private
Integer
isGziped
;
...
...
@@ -343,6 +349,14 @@ public class DmpDevelopTask implements Serializable {
this
.
version
=
version
;
}
public
String
getPublishVersion
()
{
return
publishVersion
;
}
public
void
setPublishVersion
(
String
publishVersion
)
{
this
.
publishVersion
=
publishVersion
;
}
public
Integer
getIsGziped
()
{
return
isGziped
;
}
...
...
src/main/java/com/jz/dmp/modules/model/DmpNavigationTree.java
View file @
bdf7e405
...
...
@@ -28,9 +28,9 @@ public class DmpNavigationTree implements Serializable {
@ApiModelProperty
(
value
=
"树类别(2:开发任务,3:脚本任务)"
)
private
String
category
;
/**
* 树类型(01:离线同步,02:实时同步,03:数据开发)
* 树类型(01:离线同步,02:实时同步,03:数据开发
,04:脚本开发,05:SQL开发
)
*/
@ApiModelProperty
(
value
=
"树类型(01:离线同步,02:实时同步,03:数据开发)"
)
@ApiModelProperty
(
value
=
"树类型(01:离线同步,02:实时同步,03:数据开发
,04:脚本开发,05:SQL开发
)"
)
private
String
type
;
/**
* 名称
...
...
src/main/java/com/jz/dmp/modules/service/DmpDevelopTaskService.java
View file @
bdf7e405
...
...
@@ -52,48 +52,108 @@ public interface DmpDevelopTaskService {
*/
PageInfoResponse
<
DataDevExamplesListDto
>
queryDevExamplesListPage
(
DataDevExamplesListReq
req
)
throws
Exception
;
/**
* 条件分頁查询所有任务开发
*
/**条件分頁查询所有任务开发
* @param dmpDevelopTaskRequest
* @param httpRequest
* @return
* @throws Exception
*/
public
com
.
jz
.
common
.
bean
.
PageInfoResponse
<
DmpDevelopTaskDto
>
findListWithPage
(
DmpDevelopTaskRequest
dmpDevelopTaskRequest
,
HttpServletRequest
httpRequest
)
throws
Exception
;
public
com
.
jz
.
common
.
bean
.
PageInfoResponse
<
DmpDevelopTaskDto
>
findListWithPage
(
DmpDevelopTaskRequest
dmpDevelopTaskRequest
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: getConfigFileNameNotSuffix4Published
* @Description: TODO(根据treeId取得最新版提交的同步脚本文件名(不含后缀)及版本信息)
* @param @param treeId
* @param @return
* @param @throws Exception 参数
* @return String 返回类型
* @throws
* @Title: getConfigFileNameNotSuffix4Published
* @Description: TODO(根据treeId取得最新版提交的同步脚本文件名 ( 不含后缀 ) 及版本信息)
*/
public
String
getConfigFileNameNotSuffix4Published
(
Long
treeId
)
throws
Exception
;
public
String
getConfigFileNameNotSuffix4Published
(
Long
treeId
)
throws
Exception
;
/**
* 新增开发任务
*
/**新增开发任务
* @param dmpDevelopTask
* @param httpRequest
* @return
* @throws Exception
*/
public
BaseBeanResponse
<
DmpDevelopTask
>
add
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
public
BaseBeanResponse
<
DmpDevelopTask
>
add
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**树ID查询任务开发
* @param treeId
* @param httpRequest
* @return
* @throws Exception
*/
public
BaseBeanResponse
<
DmpDevelopTaskDto
>
findByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**修改任务开发
* @param dmpDevelopTask
* @param httpRequest
* @return
* @throws Exception
*/
public
BaseBeanResponse
<
DmpDevelopTask
>
edit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @param @param flowPro
* @Title: flowSubmit
* @Description: TODO(任务流程发布到azkaban)
* @param @param treeId
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
flowSubmit
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: taskAzkabanRun
* @Description: TODO(运行任务)
* @param @param treeId
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
taskAzkabanRun
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: taskAzkabanStop
* @Description: TODO(停止任务)
* @param @param treeId
* @param @param httpRequest
* @param @return 参数
* @return BaseBeanResponse<String> 返回类型
* @throws
*/
public
BaseBeanResponse
<
String
>
taskAzkabanStop
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: taskPublish
* @Description: TODO(SHELL/SQL发布接口)
* @param @param treeId
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
taskPublish
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @Title: softDeleteByTreeId
* @Description: TODO(软删除任务)
* @param @param treeId
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
* @Title: flowSubmit
* @Description: TODO(工作任务流保存提交)
*/
public
BaseResponse
flowSubmit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
public
BaseResponse
softDeleteByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* @param @param syncTaskTreeId
...
...
src/main/java/com/jz/dmp/modules/service/FlowService.java
View file @
bdf7e405
...
...
@@ -2,6 +2,7 @@ package com.jz.dmp.modules.service;
import
com.jz.common.bean.BaseResponse
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro
;
import
com.jz.dmp.modules.model.DmpDevelopTask
;
/**
* @ClassName: FlowService
...
...
@@ -23,4 +24,15 @@ public interface FlowService {
*/
public
BaseResponse
publishFlow
(
FlowPro
flowPro
)
throws
Exception
;
/**
* @Title: deleteAzkabanFlow
* @Description: TODO(删除azkanba的任务)
* @param @param developTask
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
*/
public
BaseResponse
deleteAzkabanFlow
(
DmpDevelopTask
developTask
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskHistoryServiceImpl.java
View file @
bdf7e405
...
...
@@ -354,12 +354,6 @@ public class DmpDevelopTaskHistoryServiceImpl extends BaseService implements Dmp
// 创建时间
dmpDevelopTaskHistory
.
setCreateTime
(
new
Date
());
//版本设置
//获取已存在版本
String
maxVersion
=
dmpDevelopTaskHistoryMapper
.
getMaxVersionByTaskId
(
dmpDevelopTaskHistory
.
getTaskId
());
String
version
=
CodeGeneratorUtils
.
generatorNextTaskVesion
(
maxVersion
);
dmpDevelopTaskHistory
.
setVersion
(
version
);
dmpDevelopTaskHistoryMapper
.
insertSelective
(
dmpDevelopTaskHistory
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
bdf7e405
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
java.io.File
;
import
java.io.IOException
;
import
java.io.UnsupportedEncodingException
;
import
java.util.ArrayList
;
...
...
@@ -19,8 +20,10 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
org.springframework.aop.ThrowsAdvice
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.util.CollectionUtils
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
...
...
@@ -36,10 +39,14 @@ import com.jz.common.constant.StatuConstant;
import
com.jz.common.enums.ModuleLogEnum
;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.common.persistence.BaseService
;
import
com.jz.common.utils.AzkabanApiUtils2
;
import
com.jz.common.utils.CodeGeneratorUtils
;
import
com.jz.common.utils.DateUtils
;
import
com.jz.common.utils.FileUtils
;
import
com.jz.common.utils.GZIPUtils
;
import
com.jz.common.utils.JsonMapper
;
import
com.jz.common.utils.StringUtils
;
import
com.jz.common.utils.ZipUtils
;
import
com.jz.common.utils.web.XmlUtils
;
import
com.jz.dmp.agent.DmpAgentResult
;
import
com.jz.dmp.azkaban.dao.ExecutionFlowsMapper
;
...
...
@@ -52,15 +59,18 @@ import com.jz.dmp.modules.controller.dataOperation.bean.DataDevExamplesListDto;
import
com.jz.dmp.modules.controller.dataOperation.bean.DataDevExamplesListReq
;
import
com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListDto
;
import
com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListReq
;
import
com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto
;
import
com.jz.dmp.modules.dao.DmpDevelopTaskDao
;
import
com.jz.dmp.modules.dao.DmpNavigationTreeDao
;
import
com.jz.dmp.modules.dao.DmpProjectDao
;
import
com.jz.dmp.modules.dao.DmpSyncingDatasourceTypeDao
;
import
com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper
;
import
com.jz.dmp.modules.model.DmpAgentDatasourceInfo
;
import
com.jz.dmp.modules.model.DmpDevelopTask
;
import
com.jz.dmp.modules.model.DmpDevelopTaskHistory
;
import
com.jz.dmp.modules.model.DmpModuleOperateLog
;
import
com.jz.dmp.modules.model.DmpNavigationTree
;
import
com.jz.dmp.modules.model.DmpProjectConfigInfo
;
import
com.jz.dmp.modules.model.DmpProjectSystemInfo
;
import
com.jz.dmp.modules.model.DmpSyncingDatasource
;
import
com.jz.dmp.modules.model.DmpSyncingDatasourceType
;
...
...
@@ -70,6 +80,7 @@ import com.jz.dmp.modules.service.DmpModuleOperateLogService;
import
com.jz.dmp.modules.service.DmpSyncingDatasourceService
;
import
com.jz.dmp.modules.service.FlowService
;
import
com.jz.dmp.modules.service.OfflineSynchService
;
import
com.jz.dmp.modules.service.projconfig.DmpProjectConfigInfoService
;
/**
* 任务开发(DmpDevelopTask)表服务实现类
...
...
@@ -119,6 +130,12 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
@Autowired
private
DmpDevelopTaskHistoryService
dmpDevelopTaskHistoryService
;
@Autowired
private
DmpProjectConfigInfoMapper
dmpProjectConfigInfoMapper
;
@Autowired
private
RedisTemplate
redisTemplate
;
/**
* 添加保存dmp数据(包含校验数据)
*
...
...
@@ -1099,7 +1116,26 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
String
taskName
=
dmpDevelopTask
.
getName
();
if
(
StringUtils
.
isEmpty
(
taskName
))
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseBeanResponse
.
setMessage
(
"工作流名称不能为空"
);
baseBeanResponse
.
setMessage
(
"任务名称不能为空"
);
return
baseBeanResponse
;
}
//任务类型校验
String
taskType
=
dmpDevelopTask
.
getTaskType
();
String
treeType
=
""
;
if
(
StringUtils
.
isEmpty
(
taskType
))
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseBeanResponse
.
setMessage
(
"任务类型不能为空"
);
return
baseBeanResponse
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVELOP
))
{
treeType
=
CommConstant
.
TREE_TYPE_DEVELOP
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSHELL
))
{
treeType
=
CommConstant
.
TREE_TYPE_DEVSHELL
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSQL
))
{
treeType
=
CommConstant
.
TREE_TYPE_DEVSQL
;
}
else
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseBeanResponse
.
setMessage
(
"任务类型错误"
);
return
baseBeanResponse
;
}
...
...
@@ -1109,7 +1145,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
tree
.
setParentId
(
dmpDevelopTask
.
getParentId
());
tree
.
setProjectId
(
dmpDevelopTask
.
getProjectId
());
tree
.
setCategory
(
CommConstant
.
TREE_DEVELOP_TASK
);
tree
.
setType
(
"W"
);
tree
.
setType
(
treeType
);
tree
.
setIsLevel
(
"1"
);
tree
.
setIsEnable
(
"1"
);
tree
.
setDataStatus
(
"1"
);
...
...
@@ -1118,13 +1154,13 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
int
cnt
=
dmpNavigationTreeDao
.
countTreeByName
(
tree
);
if
(
cnt
>
0
)
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_DATA_EXISTED
);
baseBeanResponse
.
setMessage
(
"当前项目已存在同名的
工作流
名称"
);
baseBeanResponse
.
setMessage
(
"当前项目已存在同名的
任务
名称"
);
return
baseBeanResponse
;
}
dmpNavigationTreeDao
.
insert
(
tree
);
//将新增任务保存到数据库
dmpDevelopTask
.
setTaskType
(
"1"
);
dmpDevelopTask
.
setTaskType
(
taskType
);
dmpDevelopTask
.
setType
(
"W"
);
dmpDevelopTask
.
setIsSubmit
(
"0"
);
dmpDevelopTask
.
setScheduleType
(
"2"
);
...
...
@@ -1157,6 +1193,11 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
// 创建时间
dmpDevelopTask
.
setCreateTime
(
new
Date
());
//版本设置
//获取已存在版本
String
version
=
CodeGeneratorUtils
.
generatorNextTaskVesion
(
null
);
dmpDevelopTask
.
setVersion
(
version
);
dmpDevelopTaskDao
.
insert
(
dmpDevelopTask
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
...
...
@@ -1166,25 +1207,86 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
return
baseBeanResponse
;
}
/*
* (non-Javadoc)
*
* @see
* com.ycxc.vmts.service.DmpDevelopTaskService#findByTreeId(com.ycxc.vmts.controller.
* bean.DmpDevelopTaskRequest, javax.servlet.http.HttpServletRequest)
*/
@Override
public
BaseBeanResponse
<
DmpDevelopTaskDto
>
findByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseBeanResponse
<
DmpDevelopTaskDto
>
baseBeanResponse
=
new
BaseBeanResponse
<>();
DmpDevelopTask
dmpDevelopTask
=
dmpDevelopTaskDao
.
selectTaskInfoByParam
(
treeId
);
DmpDevelopTaskDto
dmpDevelopTaskDto
=
MyDmpDevelopTaskConverter
.
INSTANCE
().
domain2dto
(
dmpDevelopTask
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseBeanResponse
.
setMessage
(
"查询成功"
);
baseBeanResponse
.
setData
(
dmpDevelopTaskDto
);
return
baseBeanResponse
;
}
/**
*任务
流保存及提交
*任务
修改
*/
@Override
public
BaseResponse
flowSubmit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
BaseBeanResponse
<
DmpDevelopTask
>
edit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
{
Base
Response
baseResponse
=
new
BaseResponse
();
Base
BeanResponse
<
DmpDevelopTask
>
baseBeanResponse
=
new
BaseBeanResponse
<
DmpDevelopTask
>
();
FlowPro
flowPro
=
MyDmpDevelopTaskConverter
.
INSTANCE
().
task2flowpro
(
dmpDevelopTask
);
// 修改人
dmpDevelopTask
.
setUpdateUserId
(
getHttpRequestUserId
(
httpRequest
).
toString
());
// 修改时间
dmpDevelopTask
.
setUpdateTime
(
new
Date
());
//参数校验
Long
treeId
=
flowPro
.
getTreeId
();
if
(
treeId
==
null
)
{
// 版本设置
// 获取已存在版本
DmpDevelopTask
dmpDevelopTaskDb
=
dmpDevelopTaskDao
.
selectTaskById
(
dmpDevelopTask
.
getId
().
toString
());
String
version
=
CodeGeneratorUtils
.
generatorNextTaskVesion
(
dmpDevelopTaskDb
.
getVersion
());
dmpDevelopTask
.
setVersion
(
version
);
dmpDevelopTaskDao
.
update
(
dmpDevelopTask
);
DmpDevelopTask
dmpDevelopTaskDb2
=
dmpDevelopTaskDao
.
get
(
dmpDevelopTask
.
getTreeId
().
longValue
());
DmpDevelopTaskHistory
dmpDevelopTaskHistory
=
MyDmpDevelopTaskHistoryConverter
.
INSTANCE
().
task2history
(
dmpDevelopTaskDb2
);
// 保存版本为版本数据
dmpDevelopTaskHistoryService
.
add
(
dmpDevelopTaskHistory
,
httpRequest
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseBeanResponse
.
setMessage
(
"修改成功"
);
baseBeanResponse
.
setData
(
dmpDevelopTaskDb
);
return
baseBeanResponse
;
}
/**
*任务流程发布
*/
@Override
public
BaseResponse
flowSubmit
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
// 参数校验
if
(
treeId
==
null
)
{
baseResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseResponse
.
setMessage
(
"treeId不能为空"
);
return
baseResponse
;
}
DmpDevelopTask
dmpDevelopTask
=
dmpDevelopTaskDao
.
get
(
treeId
);
FlowPro
flowPro
=
MyDmpDevelopTaskConverter
.
INSTANCE
().
task2flowpro
(
dmpDevelopTask
);
DmpNavigationTree
dmpNavigationTree
=
dmpNavigationTreeDao
.
queryById
(
treeId
.
intValue
());
Long
projectId
=
Long
.
parseLong
(
dmpNavigationTree
.
getProjectId
().
toString
());
flowPro
.
setPublishedToProjectId
(
projectId
);
...
...
@@ -1206,12 +1308,11 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
updateDevelopTask
.
setIsSubmit
(
flowPro
.
getIsSubmit
());
updateDevelopTask
.
setIsGziped
(
1
);
dmpDevelopTaskDao
.
update
(
updateDevelopTask
);
DmpDevelopTaskHistory
dmpDevelopTaskHistory
=
MyDmpDevelopTaskHistoryConverter
.
INSTANCE
().
task2history
(
updateDevelopTask
);
//设置发布版本
updateDevelopTask
.
setPublishVersion
(
queryDmpDevelopTask
.
getVersion
());
dmpDevelopTaskDao
.
update
(
updateDevelopTask
);
//保存版本为版本数据
dmpDevelopTaskHistoryService
.
add
(
dmpDevelopTaskHistory
,
httpRequest
);
}
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
...
...
@@ -1236,6 +1337,235 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
return
xmlFileName
;
}
/**
*运行任务
*/
@Override
public
BaseResponse
taskAzkabanRun
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
DmpDevelopTask
developTask
=
dmpDevelopTaskDao
.
get
(
treeId
);
//先发布任务
boolean
flag
=
publishAndExecute
(
developTask
);
if
(
flag
)
{
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseResponse
.
setMessage
(
"立即运行成功"
);
}
else
{
baseResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
baseResponse
.
setMessage
(
"立即运行失败"
);
}
return
baseResponse
;
}
/**
* 发布流程
*/
private
boolean
publishAndExecute
(
DmpDevelopTask
dmpDevelopTask
)
throws
Exception
{
Integer
taskId
=
dmpDevelopTask
.
getId
();
//任务id
Integer
projectId
=
dmpDevelopTask
.
getProjectId
();
//项目id
String
treeName
=
dmpDevelopTask
.
getName
();
//任务流程名称
//获取项目配置信息
Map
<
String
,
Object
>
param
=
new
HashMap
<
String
,
Object
>();
param
.
put
(
"projectId"
,
projectId
);
List
<
DmpProjectConfigInfoDto
>
list
=
dmpProjectConfigInfoMapper
.
findList
(
param
);
if
(
CollectionUtils
.
isEmpty
(
list
))
{
throw
new
RuntimeException
(
"项目没有设置配置信息,请联系管理员!"
);
}
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
=
list
.
get
(
0
);
//校验任务类型并配置参数
String
taskType
=
dmpDevelopTask
.
getTaskType
();
String
azkabanExectorTaskExec
=
""
;
String
taskAlias
=
""
;
if
(
StringUtils
.
isEmpty
(
taskType
))
{
throw
new
RuntimeException
(
"任务的任务类型为空!"
);
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSHELL
))
{
azkabanExectorTaskExec
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanExectorShellExec
();
//执行shell任务命令
taskAlias
=
"shell"
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSQL
))
{
azkabanExectorTaskExec
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanExectorSqlExec
();
//执行sql任务命令
taskAlias
=
"sql"
;
}
else
{
throw
new
RuntimeException
(
"该任务类型不能调用此方法发布,请检查任务类型!"
);
}
String
azkabanLocalTaskFilePath
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanLocalTaskFilePath
();
//文件路径
String
azkabanMonitorUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanMonitorUrl
();
//AZKABAN WEB服务地址
String
azkabanJobCommand
=
"command="
+
azkabanExectorTaskExec
+
" "
+
projectId
+
" ${azkaban.flow.flowid} ${azkaban.job.id} "
+
treeName
;
/**
* 当前任务生成文件存放根路径
*/
String
localTaskPath
=
azkabanLocalTaskFilePath
+
"/"
+
projectId
+
"/local/"
+
taskId
;
File
localTaskFile
=
new
File
(
localTaskPath
);
if
(!
localTaskFile
.
exists
())
{
localTaskFile
.
mkdirs
();
}
String
localTaskSourcePath
=
localTaskPath
+
"/source/"
;
File
localTaskSourceFile
=
new
File
(
localTaskSourcePath
);
if
(!
localTaskSourceFile
.
exists
())
{
localTaskSourceFile
.
mkdirs
();
}
String
localTaskExecArgsPath
=
localTaskPath
+
"/execArgs/"
;
File
localTaskExecArgsFile
=
new
File
(
localTaskExecArgsPath
);
if
(!
localTaskExecArgsFile
.
exists
())
{
localTaskExecArgsFile
.
mkdirs
();
}
String
localTaskZipPath
=
localTaskPath
+
"/target/"
;
File
localTaskZipFile
=
new
File
(
localTaskZipPath
);
if
(!
localTaskZipFile
.
exists
())
{
localTaskZipFile
.
mkdirs
();
}
//先删除上次创建的文件 ---每次都会新建
FileUtils
.
deleteDirectory
(
localTaskPath
);
List
<
String
>
contents
=
new
ArrayList
<>();
// 子流程类型
contents
.
add
(
"type=command"
);
contents
.
add
(
azkabanJobCommand
);
// 生成job文件
String
jobFileAbsolutePath
=
localTaskSourcePath
+
treeName
+
".job"
;
FileUtils
.
write
(
jobFileAbsolutePath
,
contents
);
String
localZipTargetFileName
=
treeName
+
".zip"
;
ZipUtils
.
zip
(
localTaskSourcePath
,
localTaskZipPath
,
localZipTargetFileName
);
//上传到azkaban todo
//上次zip包到azkaban
String
localTaskZipAbsolutePath
=
localTaskZipPath
+
"/"
+
localZipTargetFileName
;
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
,
redisTemplate
);
return
azkabanApiUtils
.
loginCreateProjectuploadZipAndExecute
(
"jz_localflow_"
+
taskAlias
+
"_"
+
projectId
,
"local_"
+
taskAlias
+
"_project"
,
localTaskZipAbsolutePath
,
treeName
);
}
/**
*停止任务
*/
@Override
public
BaseBeanResponse
<
String
>
taskAzkabanStop
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
// TODO Auto-generated method stub
BaseBeanResponse
<
String
>
baseBeanResponse
=
new
BaseBeanResponse
<
String
>();
DmpDevelopTask
dmpDevelopTask
=
dmpDevelopTaskDao
.
get
(
treeId
);
if
(
dmpDevelopTask
==
null
)
{
baseBeanResponse
.
setCode
(
StatuConstant
.
CODE_ERROR_PARAMETER
);
baseBeanResponse
.
setMessage
(
"任务不存在"
);
return
baseBeanResponse
;
}
Integer
projectId
=
dmpDevelopTask
.
getProjectId
();
String
treeName
=
dmpDevelopTask
.
getName
();
//获取项目配置信息
Map
<
String
,
Object
>
param
=
new
HashMap
<
String
,
Object
>();
List
<
DmpProjectConfigInfoDto
>
list
=
dmpProjectConfigInfoMapper
.
findList
(
param
);
if
(
CollectionUtils
.
isEmpty
(
list
))
{
throw
new
RuntimeException
(
"项目没有设置配置信息,请联系管理员!"
);
}
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
=
list
.
get
(
0
);
//校验任务类型并配置参数
String
taskType
=
dmpDevelopTask
.
getTaskType
();
String
taskAlias
=
""
;
if
(
StringUtils
.
isEmpty
(
taskType
))
{
throw
new
RuntimeException
(
"任务的任务类型为空!"
);
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSHELL
))
{
taskAlias
=
"shell"
;
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSQL
))
{
taskAlias
=
"sql"
;
}
else
{
throw
new
RuntimeException
(
"该任务类型不能调用此方法停止,请检查任务类型!"
);
}
String
azkabanMonitorUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanMonitorUrl
();
//AZKABAN WEB服务地址
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
,
redisTemplate
);
String
exeIdsStr
=
azkabanApiUtils
.
stopFlow
(
"jz_localflow_"
+
taskAlias
+
"_"
+
projectId
,
treeName
);
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseBeanResponse
.
setMessage
(
"停止成功"
);
baseBeanResponse
.
setData
(
exeIdsStr
);
return
baseBeanResponse
;
}
/**
*SHELL/SQL版本发布
*/
@Override
public
BaseResponse
taskPublish
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
DmpDevelopTask
dmpDevelopTask
=
dmpDevelopTaskDao
.
get
(
treeId
);
// 修改人
dmpDevelopTask
.
setUpdateUserId
(
getHttpRequestUserId
(
httpRequest
).
toString
());
// 修改时间
dmpDevelopTask
.
setUpdateTime
(
new
Date
());
//设置发布版本
dmpDevelopTask
.
setPublishVersion
(
dmpDevelopTask
.
getVersion
());
dmpDevelopTaskDao
.
update
(
dmpDevelopTask
);
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseResponse
.
setMessage
(
"发布成功"
);
return
baseResponse
;
}
/**
*软删除任务
*/
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
)
public
BaseResponse
softDeleteByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
DmpDevelopTask
developTask
=
dmpDevelopTaskDao
.
get
(
treeId
.
longValue
());
String
taskType
=
developTask
.
getTaskType
();
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVELOP
))
{
//取消发布的任务
BaseResponse
baseResponseAzkaban
=
flowService
.
deleteAzkabanFlow
(
developTask
);
if
(
baseResponseAzkaban
.
getCode
().
equals
(
StatuConstant
.
FAILURE_CODE
))
{
throw
new
RuntimeException
(
"azkaban取消发布任务失败"
);
}
}
else
if
(
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSHELL
)
||
taskType
.
equals
(
CommConstant
.
TASK_TYPE_DEVSQL
))
{
//什么也不做
}
else
{
baseResponse
.
setCode
(
StatuConstant
.
CODE_DATA_NOTMEET
);
baseResponse
.
setMessage
(
"任务类型不适合调用该方法"
);
return
baseResponse
;
}
//软删除树
DmpNavigationTree
dmpNavigationTree
=
new
DmpNavigationTree
();
dmpNavigationTree
.
setId
(
treeId
);
dmpNavigationTree
.
setDataStatus
(
"0"
);
dmpNavigationTreeDao
.
update
(
dmpNavigationTree
);
//软删除任务
developTask
.
setDataStatus
(
"0"
);
dmpDevelopTaskDao
.
update
(
developTask
);
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseResponse
.
setMessage
(
"软删除成功"
);
return
baseResponse
;
}
/**
* 获取执行实例的日志详情
*
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpNavigationTreeServiceImpl.java
View file @
bdf7e405
...
...
@@ -75,7 +75,14 @@ public class DmpNavigationTreeServiceImpl extends BaseService implements DmpNavi
dmpNavigationTree
.
setCreateUserId
(
getHttpRequestUserId
(
null
).
toString
());
// 创建时间
dmpNavigationTree
.
setCreateTime
(
new
Date
());
dmpNavigationTree
.
setIsLevel
(
"0"
);
//dmpNavigationTree.setIsLevel("0");
DmpNavigationTree
dmpNavigationTreeParam
=
new
DmpNavigationTree
();
//查询未删除数据
dmpNavigationTreeParam
.
setDataStatus
(
"1"
);
List
<
DmpNavigationTree
>
list
=
dmpNavigationTreeDao
.
queryAll
(
dmpNavigationTreeParam
);
this
.
dmpNavigationTreeDao
.
insertSelective
(
dmpNavigationTree
);
return
dmpNavigationTree
;
}
...
...
src/main/java/com/jz/dmp/modules/service/impl/FlowServiceImpl.java
View file @
bdf7e405
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
import
org.springframework.util.CollectionUtils
;
import
com.google.common.io.BaseEncoding
;
import
com.jz.common.bean.BaseBeanResponse
;
import
com.jz.common.bean.BaseResponse
;
import
com.jz.common.constant.CommConstant
;
import
com.jz.common.constant.StatuConstant
;
import
com.jz.common.utils.AzkabanApiUtils2
;
import
com.jz.common.utils.FlowParseTool
;
import
com.jz.common.utils.GZIPUtils
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNodeChangeInfo
;
...
...
@@ -19,6 +25,7 @@ import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto;
import
com.jz.dmp.modules.dao.DmpProjectDao
;
import
com.jz.dmp.modules.dao.DmpWorkFlowSubmitDetailsDao
;
import
com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper
;
import
com.jz.dmp.modules.model.DmpDevelopTask
;
import
com.jz.dmp.modules.model.DmpProject
;
import
com.jz.dmp.modules.model.DmpProjectSystemInfo
;
import
com.jz.dmp.modules.model.DmpWorkFlowSubmitDetails
;
...
...
@@ -59,6 +66,9 @@ public class FlowServiceImpl implements FlowService {
@Autowired
private
DmpProjectConfigInfoService
dmpProjectConfigInfoService
;
@Autowired
private
DmpProjectConfigInfoMapper
dmpProjectConfigInfoMapper
;
@Autowired
private
RedisTemplate
redisTemplate
;
...
...
@@ -141,4 +151,33 @@ public class FlowServiceImpl implements FlowService {
return
baseResponse
;
}
/**
*删除azkaban任务
*/
@Override
public
BaseResponse
deleteAzkabanFlow
(
DmpDevelopTask
developTask
)
throws
Exception
{
BaseResponse
baseResponse
=
new
BaseResponse
();
Integer
projectId
=
developTask
.
getProjectId
();
//获取项目配置信息
Map
<
String
,
Object
>
param
=
new
HashMap
<
String
,
Object
>();
param
.
put
(
"projectId"
,
projectId
);
List
<
DmpProjectConfigInfoDto
>
list
=
dmpProjectConfigInfoMapper
.
findList
(
param
);
if
(
CollectionUtils
.
isEmpty
(
list
))
{
baseResponse
.
setCode
(
StatuConstant
.
FAILURE_CODE
);
baseResponse
.
setMessage
(
"项目没有设置配置信息,请联系管理员!"
);
return
baseResponse
;
}
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
=
list
.
get
(
0
);
String
azkabanMonitorUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanMonitorUrl
();
//AZKABAN WEB服务地址
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
,
redisTemplate
);
azkabanApiUtils
.
deleteAzkabanFlow
(
CommConstant
.
AZKABAN_PROJECTNAME_PREFIX
+
projectId
,
developTask
.
getName
());
baseResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
baseResponse
.
setMessage
(
"删除成功"
);
return
baseResponse
;
}
}
src/main/resources/mapper/dmp/DmpDevelopTaskHistoryMapper.xml
View file @
bdf7e405
...
...
@@ -706,10 +706,8 @@
<!-- 主键查询任务历史版本 -->
<select
id=
"getMaxVersionByTaskId"
resultType=
"java.lang.String"
>
select t.version from (
select MAX(CONVERT(REPLACE(RIGHT(version,LENGTH(version)-1),'.',''), DECIMAL) ), version
from dmp_develop_task_history where task_id = 0
) t
select MAX(CONVERT(version, DECIMAL)) AS version
from dmp_develop_task_history where task_id = #{taskId, jdbcType=INTEGER}
</select>
</mapper>
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
View file @
bdf7e405
...
...
@@ -18,17 +18,20 @@
<result
column=
"flow_header"
property=
"flowHeader"
jdbcType=
"VARCHAR"
/>
<result
column=
"version"
property=
"version"
jdbcType=
"VARCHAR"
/>
<result
column=
"publish_version"
property=
"publishVersion"
jdbcType=
"VARCHAR"
/>
<result
column=
"data_status"
property=
"dataStatus"
jdbcType=
"CHAR"
/>
<result
column=
"tree_id"
property=
"treeId"
jdbcType=
"INTEGER"
/>
<result
column=
"is_gziped"
property=
"isGziped"
jdbcType=
"INTEGER"
/>
<result
column=
"name"
property=
"name"
jdbcType=
"VARCHAR"
/>
<result
column=
"project_id"
property=
"projectId"
jdbcType=
"VARCHAR"
/>
<!-- <result column="chk_result" property="chkResult" javaType="VARCHAR"/> -->
<!-- <result column="sync_result" property="syncResult" javaType="VARCHAR"/> -->
</resultMap>
<sql
id=
"FIND_ALL_COLUMN"
>
id,task_type,type,schedule_type,is_submit,task_desc,script,data_status,create_user_id,create_time,update_user_id,update_time,tree_id,flow_header,
flow_json, version,is_gziped
flow_json, version,
publish_version,
is_gziped
</sql>
<select
id=
"getDmpTaskAndTreeInfo"
parameterType=
"string"
resultType=
"map"
>
...
...
@@ -71,9 +74,9 @@
<!--新增所有列-->
<insert
id=
"insert"
keyProperty=
"id"
useGeneratedKeys=
"true"
>
insert into dmp_develop_task(datasource_id, TASK_TYPE, TYPE, SCHEDULE_TYPE, IS_SUBMIT, TASK_DESC, SCRIPT, DATA_STATUS, CREATE_USER_ID, CREATE_TIME,
UPDATE_USER_ID, UPDATE_TIME, TREE_ID, CHK_RESULT, SYNC_RESULT, CHK_TIME, SYNC_TIME, FLOW_HEADER, FLOW_JSON, VERSION, IS_GZIPED,SOURCE_DB_NAME,SOURCE_TABLE_NAME,TARGET_DB_NAME,TARGET_TABLE_NAME)
UPDATE_USER_ID, UPDATE_TIME, TREE_ID, CHK_RESULT, SYNC_RESULT, CHK_TIME, SYNC_TIME, FLOW_HEADER, FLOW_JSON, VERSION,
publish_version,
IS_GZIPED,SOURCE_DB_NAME,SOURCE_TABLE_NAME,TARGET_DB_NAME,TARGET_TABLE_NAME)
values (#{datasourceId}, #{taskType}, #{type}, #{scheduleType}, #{isSubmit}, #{taskDesc}, #{data}, #{dataStatus}, #{createUserId}, #{createTime}, #{updateUserId},
#{updateTime}, #{treeId}, #{chkResult}, #{syncResult}, #{chkTime}, #{syncTime}, #{flowHeader}, #{flowJson}, #{version}, #{isGziped}, #{sourceDbName}, #{sourceTableName}, #{targetDbName}, #{targetTableName})
#{updateTime}, #{treeId}, #{chkResult}, #{syncResult}, #{chkTime}, #{syncTime}, #{flowHeader}, #{flowJson}, #{version}, #{
publishVersion,jdbcType=VARCHAR}, #{
isGziped}, #{sourceDbName}, #{sourceTableName}, #{targetDbName}, #{targetTableName})
</insert>
<update
id=
"update"
>
...
...
@@ -98,7 +101,7 @@
TASK_DESC = #{taskDesc},
</if>
<if
test=
"script != null"
>
SCRIPT = #{
data
},
SCRIPT = #{
script, jdbcType=BLOB
},
</if>
<if
test=
"dataStatus != null and dataStatus != ''"
>
DATA_STATUS = #{dataStatus},
...
...
@@ -139,6 +142,9 @@
<if
test=
"version != null and version != ''"
>
VERSION = #{version},
</if>
<if
test=
"publishVersion != null and publishVersion != ''"
>
publish_version = #{publishVersion},,
</if>
<if
test=
"isGziped != null"
>
IS_GZIPED = #{isGziped},
</if>
...
...
@@ -147,11 +153,12 @@
</update>
<select
id=
"selectTaskInfoByParam"
parameterType=
"map"
resultType=
"com.jz.dmp.modules.model.DmpDevelopTask"
>
select
ID, datasource_id, TASK_TYPE, TYPE, SCHEDULE_TYPE, IS_SUBMIT, TASK_DESC, SCRIPT, DATA_STATUS
, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TREE_ID, CHK_RESULT, SYNC_RESULT, CHK_TIME, SYNC_TIME, FLOW_HEADER, FLOW_JSON, VERSION, IS_GZIPED
from dmp_develop_task
where data_status ='1' and TREE_ID = #{treeId}
SELECT
task.id,task.task_type,task.type,task.schedule_type,task.is_submit,task.task_desc,task.script,task.data_status,task.create_user_id,task.create_time,
task.update_user_id,task.update_time,task.tree_id,task.flow_header,
task.flow_json, task.version, task.publish_version, task.is_gziped, tree.name
FROM dmp_develop_task task left join dmp_navigation_tree tree ON task.tree_id=tree.id
where task.data_status ='1' and task.TREE_ID = #{treeId}
</select>
<!--数据运维-数据开发任务列表分页查询-->
...
...
@@ -181,9 +188,9 @@
SELECT
task.id,task.task_type,task.type,task.schedule_type,task.is_submit,task.task_desc,task.script,task.data_status,task.create_user_id,task.create_time,
task.update_user_id,task.update_time,task.tree_id,task.flow_header,
task.flow_json, task.version,
task.is_gziped
task.flow_json, task.version,
task.publish_version, task.is_gziped, tree.name
FROM dmp_develop_task task left join dmp_navigation_tree tree ON task.tree_id=tree.id
WHERE 1=1
WHERE 1=1
AND tree.data_status = '1'
<if
test=
"taskType != null"
>
AND task.task_type = #{taskType}
</if>
<if
test=
"type != null"
>
AND task.type = #{type}
</if>
<if
test=
"scheduleType != null"
>
AND task.schedule_type = #{scheduleType}
</if>
...
...
@@ -196,6 +203,7 @@
<if
test=
"flowHeader != null"
>
AND task.flow_header = #{flowHeader}
</if>
<if
test=
"flowJson != null"
>
AND task.flow_json = #{flowJson}
</if>
<if
test=
"version != null"
>
AND task.version = #{version}
</if>
<if
test=
"publishVersion != null"
>
AND task.publish_version = #{publishVersion}
</if>
<if
test=
"gziped != null"
>
AND task.is_gziped = #{gziped}
</if>
<if
test=
"projectId != null"
>
AND tree.project_id = #{projectId}
</if>
</select>
...
...
@@ -218,7 +226,12 @@
<!-- 主键获取对象 -->
<select
id=
"get"
parameterType=
"java.lang.Long"
resultMap=
"DmpDevelopTaskResultMap"
>
SELECT
<include
refid=
"FIND_ALL_COLUMN"
/>
FROM dmp_develop_task WHERE tree_id = #{id}
SELECT
task.id,task.task_type,task.type,task.schedule_type,task.is_submit,task.task_desc,task.script,task.data_status,task.create_user_id,task.create_time,
task.update_user_id,task.update_time,task.tree_id,task.flow_header,
task.flow_json, task.version, task.publish_version, task.is_gziped, tree.name, tree.project_id
FROM dmp_develop_task task left join dmp_navigation_tree tree ON task.tree_id=tree.id
WHERE task.tree_id = #{id}
</select>
<select
id=
"selectTaskById"
resultType=
"com.jz.dmp.modules.model.DmpDevelopTask"
>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment