Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
e2a8d910
Commit
e2a8d910
authored
Mar 04, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
commit
parent
fe8b550d
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
133 additions
and
35 deletions
+133
-35
GatewayApiConstant.java
src/main/java/com/jz/common/constant/GatewayApiConstant.java
+2
-0
HttpClientUtils.java
src/main/java/com/jz/common/utils/web/HttpClientUtils.java
+1
-1
RealTimeSyncController.java
...es/controller/DataIntegration/RealTimeSyncController.java
+1
-1
DmpApiServiceMangeController.java
.../controller/dataService/DmpApiServiceMangeController.java
+24
-3
OrganizationManageListQueryReq.java
...ller/dataService/bean/OrganizationManageListQueryReq.java
+3
-0
DmpApiServiceMangeService.java
...com/jz/dmp/modules/service/DmpApiServiceMangeService.java
+10
-1
DmpApiServiceMangeServiceImpl.java
...p/modules/service/impl/DmpApiServiceMangeServiceImpl.java
+17
-2
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+2
-0
DmpRealtimeSyncInfoServiceImpl.java
.../modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
+65
-17
ExecutionFlowsMapper.xml
src/main/resources/azkabanmapper/ExecutionFlowsMapper.xml
+4
-6
DmpDevelopTaskMapper.xml
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
+4
-4
No files found.
src/main/java/com/jz/common/constant/GatewayApiConstant.java
View file @
e2a8d910
...
...
@@ -84,6 +84,8 @@ public class GatewayApiConstant {
public
static
final
String
listGetApiKey
=
"/api/interface/listGetApiKey"
;
//删除文件夹
public
static
final
String
delFolder
=
"/api/producer/delProjectFolder"
;
//删除文件夹
public
static
final
String
functionTemplate
=
"/api/producer/getFunctionTemplateList"
;
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
GatewayApiConstant
.
class
);
...
...
src/main/java/com/jz/common/utils/web/HttpClientUtils.java
View file @
e2a8d910
...
...
@@ -574,7 +574,7 @@ public class HttpClientUtils {
System
.
out
.
println
(
response
.
getStatusLine
());
if
(
entity
!=
null
)
{
result
=
EntityUtils
.
toString
(
entity
);
LOGGER
.
info
(
result
);
LOGGER
.
info
(
"response results{}"
+
result
);
}
}
catch
(
ClientProtocolException
e
)
{
...
...
src/main/java/com/jz/dmp/modules/controller/DataIntegration/RealTimeSyncController.java
View file @
e2a8d910
...
...
@@ -70,7 +70,7 @@ public class RealTimeSyncController {
}
/**
* 批量启动实时同步任务
* 批量启动
/停止启动
实时同步任务
*
* @return
* @author Bellamy
...
...
src/main/java/com/jz/dmp/modules/controller/dataService/DmpApiServiceMangeController.java
View file @
e2a8d910
...
...
@@ -298,11 +298,11 @@ public class DmpApiServiceMangeController {
@ApiOperation
(
value
=
"获取文件夹列表"
,
notes
=
"获取文件夹列表"
)
@GetMapping
(
value
=
"/folderTree"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"projectId"
,
value
=
"项目id"
),
@ApiImplicitParam
(
name
=
"org
Code
"
,
value
=
"组织编码"
)})
public
JsonResult
getFolderTree
(
@RequestParam
(
name
=
"projectId"
,
required
=
false
)
String
projectId
,
@RequestParam
(
name
=
"org
Code"
,
required
=
false
)
String
orgCode
)
{
@ApiImplicitParam
(
name
=
"org
Folder
"
,
value
=
"组织编码"
)})
public
JsonResult
getFolderTree
(
@RequestParam
(
name
=
"projectId"
,
required
=
false
)
String
projectId
,
@RequestParam
(
name
=
"org
Folder"
,
required
=
false
)
String
orgFolder
)
{
JsonResult
jsonResult
=
new
JsonResult
();
try
{
jsonResult
=
dmpApiServiceMangeService
.
getFolderTree
(
projectId
,
org
Code
);
jsonResult
=
dmpApiServiceMangeService
.
getFolderTree
(
projectId
,
org
Folder
);
}
catch
(
Exception
e
)
{
jsonResult
.
setMessage
(
e
.
getMessage
());
jsonResult
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
...
...
@@ -373,6 +373,27 @@ public class DmpApiServiceMangeController {
return
jsonResult
;
}
/**
* 获取function模板列表
*
* @return
* @author Bellamy
* @since 2021-03-3
*/
@ApiOperation
(
value
=
"获取function模板列表"
,
notes
=
"获取function模板列表"
)
@GetMapping
(
value
=
"/functionTemplate"
)
public
JsonResult
getFunctionTemplateList
()
throws
Exception
{
JsonResult
jsonResult
=
new
JsonResult
();
try
{
jsonResult
=
dmpApiServiceMangeService
.
getFunctionTemplateList
();
}
catch
(
Exception
e
)
{
jsonResult
.
setMessage
(
e
.
getMessage
());
jsonResult
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
}
return
jsonResult
;
}
/**
* 获取数据源表字段
*
...
...
src/main/java/com/jz/dmp/modules/controller/dataService/bean/OrganizationManageListQueryReq.java
View file @
e2a8d910
...
...
@@ -27,4 +27,7 @@ public class OrganizationManageListQueryReq extends BasePageBean implements Seri
@ApiModelProperty
(
value
=
"联系人"
)
private
String
linkman
;
@ApiModelProperty
(
value
=
"文件夹id"
)
private
String
fileId
;
}
src/main/java/com/jz/dmp/modules/service/DmpApiServiceMangeService.java
View file @
e2a8d910
...
...
@@ -97,7 +97,7 @@ public interface DmpApiServiceMangeService {
* @author Bellamy
* @since 2021-02-24
*/
JsonResult
getFolderTree
(
String
projectId
,
String
org
Code
)
throws
Exception
;
JsonResult
getFolderTree
(
String
projectId
,
String
org
Folder
)
throws
Exception
;
/**
* API计量--API已调用列表
...
...
@@ -131,4 +131,13 @@ public interface DmpApiServiceMangeService {
* @since 2021-03-3
*/
JsonResult
delFolderById
(
String
id
)
throws
Exception
;
/**
* 获取function模板列表
*
* @return
* @author Bellamy
* @since 2021-03-3
*/
JsonResult
getFunctionTemplateList
()
throws
Exception
;
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpApiServiceMangeServiceImpl.java
View file @
e2a8d910
...
...
@@ -265,6 +265,21 @@ public class DmpApiServiceMangeServiceImpl implements DmpApiServiceMangeService
return
result
;
}
/**
* 获取function模板列表
*
* @return
* @author Bellamy
* @since 2021-03-3
*/
@Override
public
JsonResult
getFunctionTemplateList
()
throws
Exception
{
String
url
=
gatewayUrl
+
GatewayApiConstant
.
functionTemplate
;
Map
params
=
new
HashMap
();
JsonResult
result
=
GatewayApiConstant
.
getRequest2GetData
(
url
,
params
);
return
result
;
}
/**
* 服务开发API列表
*
...
...
@@ -289,14 +304,14 @@ public class DmpApiServiceMangeServiceImpl implements DmpApiServiceMangeService
* @since 2021-02-24
*/
@Override
public
JsonResult
getFolderTree
(
String
projectId
,
String
org
Code
)
throws
Exception
{
public
JsonResult
getFolderTree
(
String
projectId
,
String
org
Folder
)
throws
Exception
{
String
url
=
gatewayUrl
+
GatewayApiConstant
.
folderTree
;
Map
params
=
new
HashMap
();
if
(
StringUtils
.
isNotEmpty
(
projectId
))
{
params
.
put
(
"projectId"
,
projectId
);
}
params
.
put
(
"org
Code"
,
orgCode
);
params
.
put
(
"org
Folder"
,
orgFolder
);
JsonResult
result
=
GatewayApiConstant
.
getRequest2GetData
(
url
,
params
);
return
result
;
}
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
e2a8d910
...
...
@@ -14,6 +14,7 @@ import java.util.regex.Pattern;
import
javax.servlet.http.HttpServletRequest
;
import
com.alibaba.fastjson.JSONObject
;
import
com.jz.common.utils.web.HttpClientUtils
;
import
com.mysql.jdbc.Blob
;
import
org.apache.tomcat.jni.Mmap
;
import
org.slf4j.Logger
;
...
...
@@ -1603,4 +1604,5 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
return
JsonResult
.
ok
(
list
);
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
View file @
e2a8d910
...
...
@@ -2,6 +2,7 @@ package com.jz.dmp.modules.service.impl;
import
com.alibaba.fastjson.JSONObject
;
import
com.amazonaws.services.dynamodbv2.xspec.M
;
import
com.github.pagehelper.PageHelper
;
import
com.github.pagehelper.PageInfo
;
import
com.jz.agent.service.DmpDsAgentService
;
...
...
@@ -423,10 +424,20 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
DmpRealtimeTaskHistory
taskHistory
=
new
DmpRealtimeTaskHistory
();
BeanUtils
.
copyProperties
(
saveBody
,
taskHistory
);
if
(
StringUtils
.
isEmpty
(
String
.
valueOf
(
params
.
get
(
"taskId"
))))
{
Map
<
String
,
String
>
respData
=
publishTask2Kafka
(
connectorUrl
,
jsonStr
);
Map
<
String
,
String
>
respData
=
publishTask2Kafka
(
connectorUrl
,
jsonStr
,
""
);
saveBody
.
setConnectorJobId
(
respData
.
get
(
"connectorJobId"
));
saveBody
.
setStatus
(
respData
.
get
(
"status"
));
dmpRealtimeSyncInfoDao
.
insert
(
saveBody
);
//异步获取任务状态
Thread
thread
=
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
Map
map
=
new
HashMap
();
map
.
put
(
"id"
,
saveBody
.
getId
());
getKafkaTaskStatus
(
connectorUrl
,
map
);
}
});
thread
.
start
();
}
else
{
DmpRealtimeSyncInfo
realtimeTask
=
dmpRealtimeSyncInfoDao
.
queryById
(
Integer
.
valueOf
(
params
.
get
(
"taskId"
).
toString
()));
if
(
realtimeTask
==
null
)
...
...
@@ -436,7 +447,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
//编辑时,先删除任务,再发布任务
HttpClientUtils
.
httpDelete
(
connectorUrl
+
"/"
+
realtimeTask
.
getConnectorJobId
()
+
"/"
);
Map
<
String
,
String
>
respData
=
publishTask2Kafka
(
connectorUrl
,
jsonStr
);
Map
<
String
,
String
>
respData
=
publishTask2Kafka
(
connectorUrl
,
jsonStr
,
params
.
get
(
"taskId"
).
toString
()
);
saveBody
.
setConnectorJobId
(
respData
.
get
(
"connectorJobId"
));
saveBody
.
setStatus
(
respData
.
get
(
"status"
));
saveBody
.
setUpdateTime
(
new
Date
());
...
...
@@ -468,7 +479,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
/*
* 发布到kafak 并开始运行 ,请求接口正常则保存数据,否则失败
* */
public
Map
<
String
,
String
>
publishTask2Kafka
(
String
connectorUrl
,
String
jsonStr
)
throws
Exception
{
public
Map
<
String
,
String
>
publishTask2Kafka
(
String
connectorUrl
,
String
jsonStr
,
String
id
)
throws
Exception
{
Map
<
String
,
String
>
returnMap
=
new
HashMap
();
Map
<
String
,
Object
>
result
=
RestClient
.
post
(
connectorUrl
,
jsonStr
);
logger
.
info
(
"=======response data {}"
+
JSONObject
.
toJSONString
(
result
));
...
...
@@ -476,7 +487,10 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
if
(
StringUtils
.
isEmpty
(
connectorJobId
))
{
throw
new
RuntimeException
(
"提交失败!"
);
}
String
status
=
getExecuteShellStatus
(
connectorUrl
+
"/"
+
connectorJobId
+
"/status"
,
new
HashMap
());
Map
params
=
new
HashMap
();
params
.
put
(
"id"
,
id
);
params
.
put
(
"flag"
,
"publish"
);
String
status
=
getExecuteKafkaStatus
(
connectorUrl
+
"/"
+
connectorJobId
+
"/status"
,
params
);
returnMap
.
put
(
"status"
,
status
);
returnMap
.
put
(
"connectorJobId"
,
connectorJobId
);
return
returnMap
;
...
...
@@ -990,7 +1004,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
String
url
=
connectorUrl
+
"/"
+
connectorJobId
;
String
statusUrl
=
url
;
//resume 恢复,删除 delete,pause 暂停
String
param
=
""
;
if
(
"01"
.
equals
(
type
))
{
url
+=
"/resume"
;
}
else
if
(
"02"
.
equals
(
type
))
{
...
...
@@ -998,11 +1011,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
logger
.
info
(
"######执行表数据id{}"
+
ids
[
i
]);
//
执行 shell
//
请求 kafka api
OKHttpUtil
.
httpPut
(
url
,
""
);
//获取任务状态
String
status
=
getExecuteShellStatus
(
statusUrl
+
"/status"
,
new
HashMap
<>());
logger
.
info
(
"response status{}"
+
status
);
Map
params
=
new
HashMap
<>();
params
.
put
(
"id"
,
realTaskId
);
params
.
put
(
"flag"
,
url
.
substring
(
url
.
lastIndexOf
(
"/"
)
+
1
));
String
status
=
getExecuteKafkaStatus
(
statusUrl
+
"/status"
,
params
);
//执行后任务状态 :PAUSED 暂停 , RUNNING 运行中
saveBaby
.
setStatus
(
status
);
saveBaby
.
setUptPerson
(
SessionUtils
.
getCurrentUserId
());
...
...
@@ -1015,23 +1031,55 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
/*
* 获取任务状态
*
kafka rest api
获取任务状态
* */
public
String
getExecute
Shell
Status
(
String
statusUrl
,
Map
params
)
throws
Exception
{
String
respData
=
HttpClientUtils
.
getJsonForParam
(
statusUrl
,
params
);
public
String
getExecute
Kafka
Status
(
String
statusUrl
,
Map
params
)
throws
Exception
{
String
respData
=
HttpClientUtils
.
getJsonForParam
(
statusUrl
,
new
HashMap
<>()
);
if
(
StringUtils
.
isEmpty
(
respData
))
{
throw
new
RuntimeException
(
"
执行失败
!"
);
throw
new
RuntimeException
(
"
get status failed
!"
);
}
logger
.
info
(
"#################响应结果{}"
+
respData
);
Map
map
=
JSONObject
.
parseObject
(
respData
);
if
(!
map
.
containsKey
(
"connector"
))
{
throw
new
RuntimeException
(
"执行失败!"
);
String
tasksStatus
=
""
;
List
<
Map
>
tasks
=
(
List
<
Map
>)
map
.
get
(
"tasks"
);
if
(
tasks
.
size
()
>
0
&&
null
!=
tasks
)
{
tasksStatus
=
(
String
)
tasks
.
get
(
0
).
get
(
"state"
);
logger
.
info
(
"response tasks status{}"
+
tasksStatus
);
return
tasksStatus
;
}
Map
connector
=
(
Map
)
map
.
get
(
"connector"
);
if
(!
connector
.
containsKey
(
"state"
))
{
throw
new
RuntimeException
(
"get status failed!"
);
}
String
status
=
(
String
)
connector
.
get
(
"state"
);
if
(
StringUtils
.
isEmpty
(
status
))
throw
new
RuntimeException
(
"执行失败!"
);
logger
.
info
(
"response connector status{}"
+
status
);
if
(
StringUtils
.
isNotEmpty
((
String
)
params
.
get
(
"id"
)))
{
Thread
thread
=
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
getKafkaTaskStatus
(
statusUrl
,
params
);
}
});
thread
.
start
();
}
return
status
;
}
public
void
getKafkaTaskStatus
(
String
statusUrl
,
Map
params
)
{
String
respData
=
HttpClientUtils
.
getJsonForParam
(
statusUrl
,
params
);
if
(
StringUtils
.
isEmpty
(
respData
))
{
throw
new
RuntimeException
(
"get status failed!"
);
}
Map
map
=
JSONObject
.
parseObject
(
respData
);
List
<
Map
>
tasks
=
(
List
<
Map
>)
map
.
get
(
"tasks"
);
if
(
tasks
.
size
()
==
0
)
{
getKafkaTaskStatus
(
statusUrl
,
params
);
}
String
status
=
(
String
)
tasks
.
get
(
0
).
get
(
"state"
);
logger
.
info
(
"response tasks status{}"
+
status
);
DmpRealtimeSyncInfo
saveBody
=
new
DmpRealtimeSyncInfo
();
saveBody
.
setStatus
(
status
);
saveBody
.
setId
(
Integer
.
valueOf
(
params
.
get
(
"id"
).
toString
()));
dmpRealtimeSyncInfoDao
.
update
(
saveBody
);
}
}
\ No newline at end of file
src/main/resources/azkabanmapper/ExecutionFlowsMapper.xml
View file @
e2a8d910
...
...
@@ -182,12 +182,10 @@
execution_flows
where 1=1
<if
test=
"businessTime != null and businessTime != ''"
>
and from_unixtime(submit_time/1000,'%Y-%m-%d') =#{businessTime}
</if>
<if
test=
"taskName != null and taskName != ''"
>
and flow_id in
<foreach
collection=
"taskName"
item=
"item"
open=
"("
separator=
","
close=
")"
>
#{item}
</foreach>
</if>
and flow_id in
<foreach
collection=
"taskName"
item=
"item"
open=
"("
separator=
","
close=
")"
>
#{item}
</foreach>
<if
test=
"startTime != null and startTime != ''"
>
and from_unixtime(submit_time/1000,'%Y-%m-%d %H:%i:%s') >=#{startTime}
</if>
<if
test=
"endTime != null and endTime != ''"
>
#{endTime} >= and from_unixtime(submit_time/1000,'%Y-%m-%d %H:%i:%s')
</if>
</select>
...
...
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
View file @
e2a8d910
...
...
@@ -175,9 +175,9 @@
t3.real_name as userName
from
dmp_navigation_tree t1
left
join dmp_develop_task t2 on t2.TREE_ID=t1.ID and t2.data_status ='1'
inner
join dmp_develop_task t2 on t2.TREE_ID=t1.ID and t2.data_status ='1'
left join dmp_member t3 on t1.create_user_id=t3.user_id
where 1=1 and
t1.type='0
1'
where 1=1 and
IS_LEVEL ='
1'
and t1.project_id = #{projectId}
<if
test=
"taskId != null and taskId != ''"
>
and t2.id =#{taskId}
</if>
<if
test=
"treeIdOrName != null and treeIdOrName != ''"
>
and t1.name like concat('%',#{treeIdOrName},'%')
</if>
...
...
@@ -218,9 +218,9 @@
(case when t1.type='01' then '离线同步' when t1.type='02' then '实时同步' when t1.type='03' then '数据开发' end) as type
from
dmp_navigation_tree t1
left
join dmp_develop_task t2 on t2.TREE_ID=t1.ID and t2.data_status ='1'
inner
join dmp_develop_task t2 on t2.TREE_ID=t1.ID and t2.data_status ='1'
left join dmp_member t3 on t1.create_user_id=t3.user_id
where 1=1 and
t1.type='0
1'
where 1=1 and
is_level = '1' and t1.data_status ='
1'
and t1.project_id = #{projectId}
<if
test=
"treeId != null and treeId != ''"
>
and t1.id =#{treeId}
</if>
<if
test=
"taskName != null and taskName != ''"
>
and t1.name like concat('%',#{taskName},'%')
</if>
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment