Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
c87590f0
Commit
c87590f0
authored
Mar 08, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
commit
parent
50594f0b
Changes
9
Hide whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
441 additions
and
160 deletions
+441
-160
FlowInstanceEnum.java
src/main/java/com/jz/common/enums/FlowInstanceEnum.java
+4
-1
AzkabanApiUtils2.java
src/main/java/com/jz/common/utils/AzkabanApiUtils2.java
+35
-35
HttpClientUtils.java
src/main/java/com/jz/common/utils/web/HttpClientUtils.java
+1
-0
ExecutionFlowsMapper.java
...ain/java/com/jz/dmp/azkaban/dao/ExecutionFlowsMapper.java
+25
-0
DmpDevTaskController.java
...odules/controller/dataOperation/DmpDevTaskController.java
+55
-2
DataDevTaskListDto.java
...les/controller/dataOperation/bean/DataDevTaskListDto.java
+11
-0
DmpDevelopTaskService.java
...ava/com/jz/dmp/modules/service/DmpDevelopTaskService.java
+145
-118
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+108
-3
ExecutionFlowsMapper.xml
src/main/resources/azkabanmapper/ExecutionFlowsMapper.xml
+57
-1
No files found.
src/main/java/com/jz/common/enums/FlowInstanceEnum.java
View file @
c87590f0
...
@@ -8,7 +8,7 @@ package com.jz.common.enums;
...
@@ -8,7 +8,7 @@ package com.jz.common.enums;
*/
*/
public
enum
FlowInstanceEnum
{
public
enum
FlowInstanceEnum
{
/**
/**
* 运行
* 运行
中
*/
*/
running
(
"running"
,
"30"
),
running
(
"running"
,
"30"
),
...
@@ -19,6 +19,9 @@ public enum FlowInstanceEnum {
...
@@ -19,6 +19,9 @@ public enum FlowInstanceEnum {
kill
(
"kill"
,
"60"
),
kill
(
"kill"
,
"60"
),
/**
* 失败
*/
failed
(
"failed"
,
"70"
),
failed
(
"failed"
,
"70"
),
;
;
...
...
src/main/java/com/jz/common/utils/AzkabanApiUtils2.java
View file @
c87590f0
...
@@ -2,11 +2,14 @@ package com.jz.common.utils;
...
@@ -2,11 +2,14 @@ package com.jz.common.utils;
import
java.io.File
;
import
java.io.File
;
import
java.io.UnsupportedEncodingException
;
import
java.net.URLEncoder
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
com.jz.common.constant.JsonResult
;
import
com.jz.dmp.modules.controller.dataOperation.bean.SetSlaReq
;
import
com.jz.dmp.modules.controller.dataOperation.bean.SetSlaReq
;
import
com.jz.dmp.modules.controller.dataOperation.bean.SetSlaRulesReq
;
import
com.jz.dmp.modules.controller.dataOperation.bean.SetSlaRulesReq
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
...
@@ -730,51 +733,48 @@ public class AzkabanApiUtils2 {
...
@@ -730,51 +733,48 @@ public class AzkabanApiUtils2 {
/**
/**
* Set SLA
* Set SLA
* @author Bellamy
* @author Bellamy
* @return 调用成功后,返回execid
* @since 2021-03-08
* @return
*/
*/
@SuppressWarnings
(
"unchecked"
)
@SuppressWarnings
(
"unchecked"
)
public
String
saveSla
(
SetSlaReq
req
)
{
public
String
saveSla
(
SetSlaReq
req
)
{
String
sessionId
=
login
();
String
sessionId
=
login
();
String
executeFlowUrl
=
azkabanServerUrl
+
"/schedule?token="
+
sessionId
;
String
executeFlowUrl
=
azkabanServerUrl
+
"/schedule?ajax=setSla&session.id="
+
sessionId
;
LinkedMultiValueMap
<
String
,
Object
>
linkedMultiValueMap
=
new
LinkedMultiValueMap
<
String
,
Object
>();
executeFlowUrl
+=
"&scheduleId="
+
req
.
getScheduleId
()
+
"&slaEmails="
+
req
.
getSlaEmails
();
linkedMultiValueMap
.
add
(
"session.id"
,
sessionId
);
String
settings
=
"&"
;
linkedMultiValueMap
.
add
(
"ajax"
,
"setSla"
);
linkedMultiValueMap
.
add
(
"scheduleId"
,
req
.
getScheduleId
());
linkedMultiValueMap
.
add
(
"slaEmails"
,
req
.
getSlaEmails
());
List
<
SetSlaRulesReq
>
slaRule
=
req
.
getSlaRule
();
List
<
SetSlaRulesReq
>
slaRule
=
req
.
getSlaRule
();
for
(
int
i
=
0
;
i
<
slaRule
.
size
();
i
++)
{
for
(
int
i
=
0
;
i
<
slaRule
.
size
();
i
++)
{
SetSlaRulesReq
sla
=
slaRule
.
get
(
i
);
SetSlaRulesReq
sla
=
slaRule
.
get
(
i
);
linkedMultiValueMap
.
add
(
"settings["
+
i
+
"]"
,
sla
.
getFlow
()
+
","
+
sla
.
getSlaRule
()
+
","
+
sla
.
getDuration
()+
","
+
sla
.
getEmailNotification
()+
","
+
sla
.
getKill
());
settings
+=
"settings%5B"
+
i
+
"%5D="
+
sla
.
getFlow
()
+
","
+
sla
.
getSlaRule
()
+
","
+
sla
.
getDuration
()+
","
+
sla
.
getEmailNotification
()+
","
+
sla
.
getKill
();
}
Map
<
String
,
Object
>
postForObject
=
null
;
try
{
postForObject
=
bulidRestTemplate
().
postForObject
(
executeFlowUrl
,
linkedMultiValueMap
,
Map
.
class
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
executeFlowUrl
+
"----"
+
linkedMultiValueMap
+
"----设置SLA接口异常"
);
e
.
printStackTrace
();
throw
new
RuntimeException
(
"设置SLA接口异常"
);
}
if
(
postForObject
.
containsKey
(
"error"
)){
throw
new
RuntimeException
(
postForObject
.
get
(
"error"
).
toString
());
}
String
status
=
(
String
)
postForObject
.
get
(
"status"
);
if
(
"error"
.
equals
(
status
))
{
String
message
=
(
String
)
postForObject
.
get
(
"message"
);
throw
new
RuntimeException
(
message
);
}
/*Integer[] execIds = new Integer[] {};
if (postForObject.get("execIds")!=null) {
execIds = (Integer[])postForObject.get("execIds");
}
}
String
result
=
HttpClientUtils
.
post
(
executeFlowUrl
+
settings
,
""
);
return
result
;
}
LOGGER.info("获取所有任务("+projectName+")正在调度execids"+execIds.toString());*/
/**
* 获取 SLA
* @author Bellamy
* @since 2021-03-08
* @return
*/
public
String
getSlaInfo
(
String
scheduleId
)
{
String
sessionId
=
login
();
String
executeFlowUrl
=
azkabanServerUrl
+
"/schedule?ajax=slaInfo&session.id="
+
sessionId
+
"&scheduleId="
+
scheduleId
;
String
result
=
HttpClientUtils
.
getJsonForParam
(
executeFlowUrl
,
new
HashMap
<>());
return
result
;
}
return
null
;
/**
* 获取 业务流程 调度周期
* @author Bellamy
* @since 2021-03-08
* @return
*/
public
String
getSchedule
(
String
projectId
,
String
flowId
)
{
String
sessionId
=
login
();
String
executeFlowUrl
=
azkabanServerUrl
+
"/schedule?ajax=fetchSchedule&session.id="
+
sessionId
+
"&projectId="
+
projectId
+
"&flowId="
+
flowId
;
String
result
=
HttpClientUtils
.
getJsonForParam
(
executeFlowUrl
,
new
HashMap
<>());
return
result
;
}
}
}
}
src/main/java/com/jz/common/utils/web/HttpClientUtils.java
View file @
c87590f0
...
@@ -172,6 +172,7 @@ public class HttpClientUtils {
...
@@ -172,6 +172,7 @@ public class HttpClientUtils {
HttpEntity
resEntity
=
response
.
getEntity
();
HttpEntity
resEntity
=
response
.
getEntity
();
if
(
resEntity
!=
null
)
{
if
(
resEntity
!=
null
)
{
result
=
EntityUtils
.
toString
(
resEntity
,
charset
);
result
=
EntityUtils
.
toString
(
resEntity
,
charset
);
LOGGER
.
info
(
"response results{}"
+
result
);
}
}
}
}
}
catch
(
Exception
ex
)
{
}
catch
(
Exception
ex
)
{
...
...
src/main/java/com/jz/dmp/azkaban/dao/ExecutionFlowsMapper.java
View file @
c87590f0
...
@@ -40,5 +40,30 @@ public interface ExecutionFlowsMapper {
...
@@ -40,5 +40,30 @@ public interface ExecutionFlowsMapper {
Map
<
String
,
Object
>
queryTaskInstanceStatus
()
throws
Exception
;
Map
<
String
,
Object
>
queryTaskInstanceStatus
()
throws
Exception
;
/**
* 查询业务流程任务,最后一个执行实例的状态
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
List
<
Map
>
queryLastStatus
(
@Param
(
"taskName"
)
String
[]
taskName
,
@Param
(
"status"
)
String
status
)
throws
Exception
;
List
<
Map
>
queryLastStatus
(
@Param
(
"taskName"
)
String
[]
taskName
,
@Param
(
"status"
)
String
status
)
throws
Exception
;
/**
* 根据24小时查询运行成功或未运行的实例数量
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
Map
<
String
,
Object
>
getTaskStatus
(
@Param
(
"timeParam"
)
String
timeParam
,
@Param
(
"taskStatus"
)
String
taskStatus
,
@Param
(
"taskName"
)
String
[]
taskName
)
throws
Exception
;
/**
* 查询实例存在的天数
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
Long
queryDaysNumber
(
@Param
(
"taskStatus"
)
String
taskStatus
,
@Param
(
"taskName"
)
String
[]
taskName
)
throws
Exception
;
}
}
src/main/java/com/jz/dmp/modules/controller/dataOperation/DmpDevTaskController.java
View file @
c87590f0
...
@@ -11,6 +11,7 @@ import com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListReq;
...
@@ -11,6 +11,7 @@ import com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListReq;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiImplicitParam
;
import
io.swagger.annotations.ApiImplicitParam
;
import
io.swagger.annotations.ApiImplicitParams
;
import
io.swagger.annotations.ApiOperation
;
import
io.swagger.annotations.ApiOperation
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
@@ -70,7 +71,7 @@ public class DmpDevTaskController {
...
@@ -70,7 +71,7 @@ public class DmpDevTaskController {
@ApiImplicitParam
(
name
=
"taskId"
,
value
=
"任务id"
)
@ApiImplicitParam
(
name
=
"taskId"
,
value
=
"任务id"
)
public
JsonResult
runTaskByTaskId
(
@RequestParam
(
value
=
"taskId"
)
String
taskId
)
throws
Exception
{
public
JsonResult
runTaskByTaskId
(
@RequestParam
(
value
=
"taskId"
)
String
taskId
)
throws
Exception
{
if
(
StringUtils
.
isEmpty
(
taskId
))
{
if
(
StringUtils
.
isEmpty
(
taskId
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务id不能为空!"
);
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务id不能为空!"
);
}
}
JsonResult
list
=
dmpDevelopTaskService
.
runTaskByTaskId
(
taskId
);
JsonResult
list
=
dmpDevelopTaskService
.
runTaskByTaskId
(
taskId
);
return
list
;
return
list
;
...
@@ -93,7 +94,59 @@ public class DmpDevTaskController {
...
@@ -93,7 +94,59 @@ public class DmpDevTaskController {
try
{
try
{
result
=
dmpDevelopTaskService
.
getTaskStatus
(
projectId
);
result
=
dmpDevelopTaskService
.
getTaskStatus
(
projectId
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
result
.
setMessage
(
e
.
getMessage
());
result
.
setMessage
(
"failed!"
);
result
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
}
return
result
;
}
/**
* 运维大屏--任务完成情况
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
@ApiOperation
(
value
=
"运维大屏--任务完成情况"
,
notes
=
"运维大屏--任务完成情况"
)
@GetMapping
(
value
=
"/getTaskStatusByHours"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"projectId"
,
value
=
"projectId"
,
required
=
true
),
@ApiImplicitParam
(
name
=
"taskStatus"
,
value
=
"50:运行成功,70:未运行"
,
required
=
true
)})
public
JsonResult
getTaskStatusByHours
(
@RequestParam
String
projectId
,
@RequestParam
String
taskStatus
)
throws
Exception
{
if
(
StringUtils
.
isEmpty
(
projectId
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"projectId不能为空!"
);
}
JsonResult
result
=
new
JsonResult
();
try
{
result
=
dmpDevelopTaskService
.
getTaskStatusByHours
(
projectId
,
taskStatus
);
}
catch
(
Exception
e
)
{
result
.
setMessage
(
"failed"
);
result
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
}
return
result
;
}
/**
* 获取 SLA
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
@ApiOperation
(
value
=
"获取 SLA"
,
notes
=
"获取 SLA"
)
@GetMapping
(
value
=
"/getSlaInfo"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"scheduleId"
,
value
=
"scheduleId"
,
required
=
true
),
@ApiImplicitParam
(
name
=
"projectId"
,
value
=
"projectId"
,
required
=
true
)})
public
JsonResult
getSlaInfo
(
@RequestParam
String
scheduleId
,
@RequestParam
String
projectId
)
throws
Exception
{
if
(
StringUtils
.
isEmpty
(
scheduleId
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"scheduleId不能为空!"
);
}
JsonResult
result
=
new
JsonResult
();
try
{
result
=
dmpDevelopTaskService
.
getSlaInfo
(
scheduleId
,
projectId
);
}
catch
(
Exception
e
)
{
result
.
setMessage
(
"failed"
);
result
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
result
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
e
.
printStackTrace
();
}
}
...
...
src/main/java/com/jz/dmp/modules/controller/dataOperation/bean/DataDevTaskListDto.java
View file @
c87590f0
...
@@ -87,6 +87,9 @@ public class DataDevTaskListDto {
...
@@ -87,6 +87,9 @@ public class DataDevTaskListDto {
@ApiModelProperty
(
value
=
"任务类型"
)
@ApiModelProperty
(
value
=
"任务类型"
)
private
String
type
;
private
String
type
;
@ApiModelProperty
(
value
=
"scheduleId"
)
private
String
scheduleId
;
public
String
getStatus
()
{
public
String
getStatus
()
{
return
status
;
return
status
;
}
}
...
@@ -182,4 +185,12 @@ public class DataDevTaskListDto {
...
@@ -182,4 +185,12 @@ public class DataDevTaskListDto {
public
void
setTreeId
(
String
treeId
)
{
public
void
setTreeId
(
String
treeId
)
{
this
.
treeId
=
treeId
;
this
.
treeId
=
treeId
;
}
}
public
String
getScheduleId
()
{
return
scheduleId
;
}
public
void
setScheduleId
(
String
scheduleId
)
{
this
.
scheduleId
=
scheduleId
;
}
}
}
src/main/java/com/jz/dmp/modules/service/DmpDevelopTaskService.java
View file @
c87590f0
...
@@ -49,108 +49,116 @@ public interface DmpDevelopTaskService {
...
@@ -49,108 +49,116 @@ public interface DmpDevelopTaskService {
*/
*/
PageInfoResponse
<
DataDevExamplesListDto
>
queryDevExamplesListPage
(
DataDevExamplesListReq
req
)
throws
Exception
;
PageInfoResponse
<
DataDevExamplesListDto
>
queryDevExamplesListPage
(
DataDevExamplesListReq
req
)
throws
Exception
;
/**条件分頁查询所有任务开发
/**
* @param dmpDevelopTaskRequest
* 条件分頁查询所有任务开发
* @param httpRequest
*
* @return
* @param dmpDevelopTaskRequest
* @throws Exception
* @param httpRequest
*/
* @return
public
com
.
jz
.
common
.
bean
.
PageInfoResponse
<
DmpDevelopTaskDto
>
findListWithPage
(
DmpDevelopTaskRequest
dmpDevelopTaskRequest
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @throws Exception
*/
/**
public
com
.
jz
.
common
.
bean
.
PageInfoResponse
<
DmpDevelopTaskDto
>
findListWithPage
(
DmpDevelopTaskRequest
dmpDevelopTaskRequest
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @Title: getConfigFileNameNotSuffix4Published
* @Description: TODO(根据treeId取得最新版提交的同步脚本文件名(不含后缀)及版本信息)
/**
* @param @param treeId
* @param @param treeId
* @param @return
* @param @return
* @param @throws Exception 参数
* @param @throws Exception 参数
* @return String 返回类型
* @return String 返回类型
* @throws
* @throws
*/
* @Title: getConfigFileNameNotSuffix4Published
public
String
getConfigFileNameNotSuffix4Published
(
Long
treeId
)
throws
Exception
;
* @Description: TODO(根据treeId取得最新版提交的同步脚本文件名 ( 不含后缀 ) 及版本信息)
*/
/**新增开发任务
public
String
getConfigFileNameNotSuffix4Published
(
Long
treeId
)
throws
Exception
;
* @param dmpDevelopTask
* @param httpRequest
/**
* @return
* 新增开发任务
* @throws Exception
*
*/
* @param dmpDevelopTask
public
BaseBeanResponse
<
DmpDevelopTask
>
add
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @param httpRequest
* @return
/**树ID查询任务开发
* @throws Exception
* @param treeId
*/
* @param httpRequest
public
BaseBeanResponse
<
DmpDevelopTask
>
add
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @return
* @throws Exception
/**
*/
* 树ID查询任务开发
public
BaseBeanResponse
<
DmpDevelopTaskDto
>
findByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
*
* @param treeId
/**修改任务开发
* @param httpRequest
* @param dmpDevelopTask
* @return
* @param httpRequest
* @throws Exception
* @return
*/
* @throws Exception
public
BaseBeanResponse
<
DmpDevelopTaskDto
>
findByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
*/
public
BaseBeanResponse
<
DmpDevelopTask
>
edit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
* 修改任务开发
*
/**
* @param dmpDevelopTask
* @Title: flowSubmit
* @param httpRequest
* @Description: TODO(任务流程发布到azkaban)
* @return
* @param @param treeId
* @throws Exception
* @param @param httpRequest
*/
* @param @return
public
BaseBeanResponse
<
DmpDevelopTask
>
edit
(
DmpDevelopTask
dmpDevelopTask
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
/**
*/
* @param @param treeId
public
BaseResponse
flowSubmit
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @param @param httpRequest
* @param @return
/**
* @param @throws Exception 参数
* @Title: taskAzkabanRun
* @return BaseResponse 返回类型
* @Description: TODO(运行任务)
* @throws
* @param @param treeId
* @Title: flowSubmit
* @param @param httpRequest
* @Description: TODO(任务流程发布到azkaban)
* @param @return
*/
* @param @throws Exception 参数
public
BaseResponse
flowSubmit
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @return BaseResponse 返回类型
* @throws
/**
*/
* @param @param treeId
public
BaseResponse
taskAzkabanRun
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @param @param httpRequest
* @param @return
/**
* @param @throws Exception 参数
* @Title: taskAzkabanStop
* @return BaseResponse 返回类型
* @Description: TODO(停止任务)
* @throws
* @param @param treeId
* @Title: taskAzkabanRun
* @param @param httpRequest
* @Description: TODO(运行任务)
* @param @return 参数
*/
* @return BaseBeanResponse<String> 返回类型
public
BaseResponse
taskAzkabanRun
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @throws
*/
/**
public
BaseBeanResponse
<
String
>
taskAzkabanStop
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @param @param treeId
* @param @param httpRequest
/**
* @param @return 参数
* @Title: taskPublish
* @return BaseBeanResponse<String> 返回类型
* @Description: TODO(SHELL/SQL发布接口)
* @throws
* @param @param treeId
* @Title: taskAzkabanStop
* @param @param httpRequest
* @Description: TODO(停止任务)
* @param @return
*/
* @param @throws Exception 参数
public
BaseBeanResponse
<
String
>
taskAzkabanStop
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @return BaseResponse 返回类型
* @throws
/**
*/
* @param @param treeId
public
BaseResponse
taskPublish
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @param @param httpRequest
* @param @return
/**
* @param @throws Exception 参数
* @Title: softDeleteByTreeId
* @return BaseResponse 返回类型
* @Description: TODO(软删除任务)
* @throws
* @param @param treeId
* @Title: taskPublish
* @param @param httpRequest
* @Description: TODO(SHELL / SQL发布接口)
* @param @return
*/
* @param @throws Exception 参数
public
BaseResponse
taskPublish
(
Long
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @return BaseResponse 返回类型
* @throws
/**
*/
* @param @param treeId
public
BaseResponse
softDeleteByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
* @param @param httpRequest
* @param @return
* @param @throws Exception 参数
* @return BaseResponse 返回类型
* @throws
* @Title: softDeleteByTreeId
* @Description: TODO(软删除任务)
*/
public
BaseResponse
softDeleteByTreeId
(
Integer
treeId
,
HttpServletRequest
httpRequest
)
throws
Exception
;
/**
/**
* @param @param syncTaskTreeId
* @param @param syncTaskTreeId
...
@@ -172,20 +180,39 @@ public interface DmpDevelopTaskService {
...
@@ -172,20 +180,39 @@ public interface DmpDevelopTaskService {
*/
*/
JsonResult
queryExamplesLogByExecId
(
String
execId
)
throws
Exception
;
JsonResult
queryExamplesLogByExecId
(
String
execId
)
throws
Exception
;
/**
/**
* 运维大屏--获取任务状态
* 运维大屏--获取任务状态
*
*
* @return
* @return
* @author Bellamy
* @author Bellamy
* @since 2021-02-22
* @since 2021-02-22
*/
*/
JsonResult
getTaskStatus
(
String
projectId
)
throws
Exception
;
JsonResult
getTaskStatus
(
String
projectId
)
throws
Exception
;
/**
* 设置SLA
/**
*
* 设置SLA
* @return
*
* @author Bellamy
* @return
* @since 2021-02-03
* @author Bellamy
*/
* @since 2021-02-03
JsonResult
setSla
(
SetSlaReq
req
)
throws
Exception
;
*/
JsonResult
setSla
(
SetSlaReq
req
)
throws
Exception
;
/**
* 运维大屏--任务完成情况
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
JsonResult
getTaskStatusByHours
(
String
projectId
,
String
taskStatus
)
throws
Exception
;
/**
* 获取 SLA
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
JsonResult
getSlaInfo
(
String
scheduleId
,
String
projectId
)
throws
Exception
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
c87590f0
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
com.alibaba.fastjson.JSONObject
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.fasterxml.jackson.databind.node.ObjectNode
;
import
com.github.pagehelper.Page
;
import
com.github.pagehelper.Page
;
...
@@ -17,6 +18,7 @@ import com.jz.common.enums.ModuleLogEnum;
...
@@ -17,6 +18,7 @@ import com.jz.common.enums.ModuleLogEnum;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.common.persistence.BaseService
;
import
com.jz.common.persistence.BaseService
;
import
com.jz.common.utils.*
;
import
com.jz.common.utils.*
;
import
com.jz.common.utils.web.HttpClientUtils
;
import
com.jz.common.utils.web.XmlUtils
;
import
com.jz.common.utils.web.XmlUtils
;
import
com.jz.dmp.agent.DmpAgentResult
;
import
com.jz.dmp.agent.DmpAgentResult
;
import
com.jz.dmp.azkaban.dao.ExecutionFlowsMapper
;
import
com.jz.dmp.azkaban.dao.ExecutionFlowsMapper
;
...
@@ -44,6 +46,7 @@ import javax.servlet.http.HttpServletRequest;
...
@@ -44,6 +46,7 @@ import javax.servlet.http.HttpServletRequest;
import
java.io.File
;
import
java.io.File
;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.io.UnsupportedEncodingException
;
import
java.io.UnsupportedEncodingException
;
import
java.math.BigDecimal
;
import
java.util.*
;
import
java.util.*
;
import
java.util.regex.Pattern
;
import
java.util.regex.Pattern
;
...
@@ -774,8 +777,12 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
...
@@ -774,8 +777,12 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
*/
*/
@Override
@Override
public
PageInfoResponse
<
DataDevTaskListDto
>
queryDevTaskListPage
(
DataDevTaskListReq
req
)
throws
Exception
{
public
PageInfoResponse
<
DataDevTaskListDto
>
queryDevTaskListPage
(
DataDevTaskListReq
req
)
throws
Exception
{
PageInfoResponse
<
DataDevTaskListDto
>
pageInfoResponse
=
new
PageInfoResponse
<>();
DmpProjectSystemInfo
publishToProjectSystemInfo
=
dmpProjectDao
.
queryProjectSystemInfo
(
Long
.
valueOf
(
req
.
getProjectId
()));
//调用azkaban服务
String
azkabanApiUrl
=
publishToProjectSystemInfo
.
getAzkabanMonitorUrl
();
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
,
redisTemplate
);
PageInfoResponse
<
DataDevTaskListDto
>
pageInfoResponse
=
new
PageInfoResponse
<>();
if
(
StringUtils
.
isNotBlank
(
req
.
getTreeIdOrName
()))
{
if
(
StringUtils
.
isNotBlank
(
req
.
getTreeIdOrName
()))
{
//判断是否为整数 是整数返回true,否则返回false
//判断是否为整数 是整数返回true,否则返回false
...
@@ -784,7 +791,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
...
@@ -784,7 +791,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
req
.
setTaskId
(
req
.
getTreeIdOrName
());
//id
req
.
setTaskId
(
req
.
getTreeIdOrName
());
//id
req
.
setTreeIdOrName
(
null
);
req
.
setTreeIdOrName
(
null
);
}
else
{
}
else
{
req
.
setTreeIdOrName
(
req
.
getTreeIdOrName
().
trim
());
//
节点
名称
req
.
setTreeIdOrName
(
req
.
getTreeIdOrName
().
trim
());
//
任务
名称
}
}
}
}
if
(
StringUtils
.
isNotBlank
(
req
.
getStatus
()))
{
if
(
StringUtils
.
isNotBlank
(
req
.
getStatus
()))
{
...
@@ -812,11 +819,27 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
...
@@ -812,11 +819,27 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
}
//查询实例最后执行状态
//查询实例最后执行状态
List
<
Map
>
flowList
=
executionFlowsMapper
.
queryLastStatus
(
taskName
.
substring
(
1
).
split
(
","
),
req
.
getStatus
());
List
<
Map
>
flowList
=
executionFlowsMapper
.
queryLastStatus
(
taskName
.
substring
(
1
).
split
(
","
),
req
.
getStatus
());
for
(
Map
strFlow
:
flowList
)
{
String
projectId
=
String
.
valueOf
(
strFlow
.
get
(
"projectId"
));
String
flowId
=
String
.
valueOf
(
strFlow
.
get
(
"taskName"
));
Map
responseData
=
(
Map
)
JSONObject
.
parse
(
azkabanApiUtils
.
getSchedule
(
projectId
,
flowId
));
if
(
responseData
!=
null
&&
responseData
.
size
()
>
0
)
{
Map
schedule
=
(
Map
)
responseData
.
get
(
"schedule"
);
strFlow
.
put
(
"scheduleId"
,
schedule
.
get
(
"scheduleId"
));
strFlow
.
put
(
"cronExpression"
,
schedule
.
get
(
"cronExpression"
));
}
}
if
(
flowList
.
size
()
>
0
&&
flowList
!=
null
)
{
if
(
flowList
.
size
()
>
0
&&
flowList
!=
null
)
{
for
(
DataDevTaskListDto
str
:
listObj
)
{
for
(
DataDevTaskListDto
str
:
listObj
)
{
for
(
Map
strFlow
:
flowList
)
{
for
(
Map
strFlow
:
flowList
)
{
if
(
str
.
getTaskName
().
equals
(
strFlow
.
get
(
"taskName"
)))
{
if
(
str
.
getTaskName
().
equals
(
strFlow
.
get
(
"taskName"
)))
{
str
.
setStatus
(
String
.
valueOf
(
strFlow
.
get
(
"status"
)));
str
.
setStatus
(
String
.
valueOf
(
strFlow
.
get
(
"status"
)));
if
(
null
!=
strFlow
.
get
(
"cronExpression"
)){
str
.
setSchedulinCycle
(
String
.
valueOf
(
strFlow
.
get
(
"cronExpression"
)));
}
if
(
null
!=
strFlow
.
get
(
"scheduleId"
)){
str
.
setScheduleId
(
String
.
valueOf
(
strFlow
.
get
(
"scheduleId"
)));
}
}
}
}
}
}
}
...
@@ -1646,7 +1669,89 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
...
@@ -1646,7 +1669,89 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
String
azkabanApiUrl
=
publishToProjectSystemInfo
.
getAzkabanMonitorUrl
();
String
azkabanApiUrl
=
publishToProjectSystemInfo
.
getAzkabanMonitorUrl
();
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
,
redisTemplate
);
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
,
redisTemplate
);
azkabanApiUtils
.
saveSla
(
req
);
azkabanApiUtils
.
saveSla
(
req
);
return
null
;
return
JsonResult
.
ok
();
}
/**
* 运维大屏--任务完成情况
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
@Override
public
JsonResult
getTaskStatusByHours
(
String
projectId
,
String
taskStatus
)
throws
Exception
{
Map
param
=
new
HashMap
();
param
.
put
(
"projectId"
,
projectId
);
String
taskName
=
""
;
List
<
DataDevTaskListDto
>
list
=
dmpDevelopTaskDao
.
queryTaskTreeInfo
(
param
);
for
(
DataDevTaskListDto
str
:
list
)
{
taskName
+=
","
+
str
.
getTaskName
();
}
String
[]
taskNames
=
taskName
.
substring
(
1
).
split
(
","
);
String
nowTime
=
DateUtils
.
currentDate
();
//今天
String
yesterdayTime
=
DateUtils
.
getYesterdayStr
();
//昨天
List
<
String
>
nowList
=
new
ArrayList
<>();
Map
<
String
,
Object
>
nowData
=
executionFlowsMapper
.
getTaskStatus
(
nowTime
,
taskStatus
,
taskNames
);
if
(
nowData
==
null
||
nowData
.
size
()
==
0
)
{
for
(
int
i
=
0
;
i
<
24
;
i
++)
{
nowList
.
add
(
"0"
);
}
}
else
{
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
nowData
.
entrySet
())
{
nowList
.
add
(
String
.
valueOf
(
entry
.
getValue
()));
}
}
List
<
String
>
yesterdayList
=
new
ArrayList
<>();
Map
<
String
,
Object
>
yesterdayData
=
executionFlowsMapper
.
getTaskStatus
(
yesterdayTime
,
taskStatus
,
taskNames
);
if
(
yesterdayData
==
null
||
yesterdayData
.
size
()
==
0
)
{
for
(
int
i
=
0
;
i
<
24
;
i
++)
{
yesterdayList
.
add
(
"0"
);
}
}
else
{
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
yesterdayData
.
entrySet
())
{
yesterdayList
.
add
(
String
.
valueOf
(
entry
.
getValue
()));
}
}
List
<
String
>
historyAvgList
=
new
ArrayList
<>();
Map
<
String
,
Object
>
historyAvgData
=
executionFlowsMapper
.
getTaskStatus
(
""
,
taskStatus
,
taskNames
);
Long
dayNum
=
executionFlowsMapper
.
queryDaysNumber
(
taskStatus
,
taskNames
);
if
(
historyAvgData
==
null
||
historyAvgData
.
size
()
==
0
)
{
for
(
int
i
=
0
;
i
<
24
;
i
++)
{
historyAvgList
.
add
(
"0"
);
}
}
else
{
for
(
Map
.
Entry
<
String
,
Object
>
entry
:
historyAvgData
.
entrySet
())
{
BigDecimal
taskNum
=
new
BigDecimal
(
String
.
valueOf
(
entry
.
getValue
()));
BigDecimal
dayNumStr
=
new
BigDecimal
(
dayNum
);
historyAvgList
.
add
(
String
.
valueOf
(
taskNum
.
divide
(
dayNumStr
,
2
,
BigDecimal
.
ROUND_HALF_UP
)));
}
}
Map
returnData
=
new
HashMap
();
returnData
.
put
(
"nowData"
,
nowList
);
returnData
.
put
(
"yesterdayData"
,
yesterdayList
);
returnData
.
put
(
"historyAvgData"
,
historyAvgList
);
return
JsonResult
.
ok
(
returnData
);
}
/**
* 获取 SLA
*
* @return
* @author Bellamy
* @since 2021-03-08
*/
@Override
public
JsonResult
getSlaInfo
(
String
scheduleId
,
String
projectId
)
throws
Exception
{
DmpProjectSystemInfo
publishToProjectSystemInfo
=
dmpProjectDao
.
queryProjectSystemInfo
(
Long
.
valueOf
(
projectId
));
//调用azkaban服务
String
azkabanApiUrl
=
publishToProjectSystemInfo
.
getAzkabanMonitorUrl
();
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
,
redisTemplate
);
return
JsonResult
.
ok
(
JSONObject
.
parse
(
azkabanApiUtils
.
getSlaInfo
(
scheduleId
)));
}
}
}
}
\ No newline at end of file
src/main/resources/azkabanmapper/ExecutionFlowsMapper.xml
View file @
c87590f0
...
@@ -221,9 +221,12 @@
...
@@ -221,9 +221,12 @@
FROM execution_flows
FROM execution_flows
)t
)t
</select>
</select>
<!--查询业务流程任务,最后一个执行实例的状态-->
<select
id=
"queryLastStatus"
resultType=
"java.util.Map"
>
<select
id=
"queryLastStatus"
resultType=
"java.util.Map"
>
SELECT
SELECT
exec_id AS execId,
exec_id AS execId,
project_id as projectId,
flow_id AS taskName,
flow_id AS taskName,
max(from_unixtime( submit_time / 1000, '%Y-%m-%d %H:%i:%s' )) AS startTime,
max(from_unixtime( submit_time / 1000, '%Y-%m-%d %H:%i:%s' )) AS startTime,
(case when status='30' then '运行中' when status='50' then '成功' when status='70' then '失败' end) status
(case when status='30' then '运行中' when status='50' then '成功' when status='70' then '失败' end) status
...
@@ -231,11 +234,64 @@
...
@@ -231,11 +234,64 @@
execution_flows
execution_flows
WHERE 1 = 1
WHERE 1 = 1
<if
test=
"status != null and status !='' "
>
and status=#{status}
</if>
<if
test=
"status != null and status !='' "
>
and status=#{status}
</if>
<include
refid=
"flowIdSql"
/>
group by flow_id
</select>
<!--根据24小时查询运行成功或未运行的实例数量-->
<select
id=
"getTaskStatus"
resultType=
"java.util.LinkedHashMap"
>
select
sum(case when t.timeHour='0' then 1 else 0 end) zero,
sum(case when t.timeHour='1' then 1 else 0 end) one,
sum(case when t.timeHour='2' then 1 else 0 end) Two,
sum(case when t.timeHour='3' then 1 else 0 end) three,
sum(case when t.timeHour='4' then 1 else 0 end) four,
sum(case when t.timeHour='5' then 1 else 0 end) five,
sum(case when t.timeHour='6' then 1 else 0 end) six,
sum(case when t.timeHour='7' then 1 else 0 end) seven,
sum(case when t.timeHour='8' then 1 else 0 end) eight,
sum(case when t.timeHour='9' then 1 else 0 end) nine,
sum(case when t.timeHour='10' then 1 else 0 end) ten,
sum(case when t.timeHour='11' then 1 else 0 end) eleven,
sum(case when t.timeHour='12' then 1 else 0 end) twelve,
sum(case when t.timeHour='13' then 1 else 0 end) thirteen,
sum(case when t.timeHour='14' then 1 else 0 end) fourteen,
sum(case when t.timeHour='15' then 1 else 0 end) fifteen,
sum(case when t.timeHour='16' then 1 else 0 end) sixteen,
sum(case when t.timeHour='17' then 1 else 0 end) seventeen,
sum(case when t.timeHour='18' then 1 else 0 end) eighteen,
sum(case when t.timeHour='19' then 1 else 0 end) nineteen,
sum(case when t.timeHour='20' then 1 else 0 end) twenty,
sum(case when t.timeHour='21' then 1 else 0 end) twentyOne,
sum(case when t.timeHour='22' then 1 else 0 end) twentyTwo,
sum(case when t.timeHour='23' then 1 else 0 end) twentyThree
from(
select
hour(from_unixtime( submit_time / 1000, '%Y-%m-%d %H:%i:%s' )) as timeHour,
from_unixtime( submit_time / 1000, '%Y-%m-%d %H:%i:%s' )
from
execution_flows
where 1=1 and status=#{taskStatus}
<if
test=
"timeParam !=null and timeParam !='' "
>
and from_unixtime( submit_time / 1000, '%Y-%m-%d' ) = #{timeParam}
</if>
<include
refid=
"flowIdSql"
/>
) t
</select>
<sql
id=
"flowIdSql"
>
and flow_id in
and flow_id in
<foreach
collection=
"taskName"
item=
"item"
open=
"("
separator=
","
close=
")"
>
<foreach
collection=
"taskName"
item=
"item"
open=
"("
separator=
","
close=
")"
>
#{item}
#{item}
</foreach>
</foreach>
group by flow_id
</sql>
<!--查询实例存在的天数-->
<select
id=
"queryDaysNumber"
resultType=
"java.lang.Long"
>
select
count(distinct from_unixtime( submit_time / 1000, '%Y-%m-%d' )) dayNum
from
execution_flows
where 1=1 and status=#{taskStatus}
<include
refid=
"flowIdSql"
/>
</select>
</select>
</mapper>
</mapper>
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment