Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
c6fb901b
Commit
c6fb901b
authored
Mar 13, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
commit
parent
697ed7a7
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
91 additions
and
17 deletions
+91
-17
RealTimeSyncController.java
...es/controller/DataIntegration/RealTimeSyncController.java
+20
-1
DmpRealtimeSyncInfoDao.java
...n/java/com/jz/dmp/modules/dao/DmpRealtimeSyncInfoDao.java
+9
-0
DmpRealtimeSyncInfoService.java
...om/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
+9
-0
DmpRealtimeSyncInfoServiceImpl.java
.../modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
+29
-14
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+4
-0
application-test.yml
src/main/resources/application-test.yml
+6
-1
DmpRealtimeSyncInfoMapper.xml
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
+14
-1
No files found.
src/main/java/com/jz/dmp/modules/controller/DataIntegration/RealTimeSyncController.java
View file @
c6fb901b
...
@@ -286,7 +286,7 @@ public class RealTimeSyncController {
...
@@ -286,7 +286,7 @@ public class RealTimeSyncController {
@GetMapping
(
value
=
"/editDataEcho"
)
@GetMapping
(
value
=
"/editDataEcho"
)
@ApiImplicitParam
(
name
=
"taskId"
,
value
=
"实时任务id"
,
required
=
true
)
@ApiImplicitParam
(
name
=
"taskId"
,
value
=
"实时任务id"
,
required
=
true
)
public
JsonResult
<
RealTimeEditDataEchoDto
>
getRealtimeTaskInfo
(
@RequestParam
String
taskId
)
throws
Exception
{
public
JsonResult
<
RealTimeEditDataEchoDto
>
getRealtimeTaskInfo
(
@RequestParam
String
taskId
)
throws
Exception
{
logger
.
info
(
"########
###########请求参数{}taskId="
+
taskId
+
"############
#######"
);
logger
.
info
(
"########
请求参数{}taskId="
+
taskId
+
"
#######"
);
if
(
StringUtils
.
isEmpty
(
taskId
))
{
if
(
StringUtils
.
isEmpty
(
taskId
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"实时任务id不能为空!"
);
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"实时任务id不能为空!"
);
}
}
...
@@ -294,6 +294,25 @@ public class RealTimeSyncController {
...
@@ -294,6 +294,25 @@ public class RealTimeSyncController {
return
jsonResult
;
return
jsonResult
;
}
}
/**
* 通过treeId 查询实时任务
*
* @return
* @author Bellamy
* @since 2021-01-12
*/
@ApiOperation
(
value
=
"通过treeId 查询实时任务"
,
notes
=
"通过treeId 查询实时任务"
)
@GetMapping
(
value
=
"/getRealtimeTaskInfoByTreeId"
)
@ApiImplicitParam
(
name
=
"treeId"
,
value
=
"treeId"
,
required
=
true
)
public
JsonResult
getRealtimeTaskInfoByTreeId
(
@RequestParam
String
treeId
)
throws
Exception
{
logger
.
info
(
"########请求参数{}treeId="
+
treeId
+
"#######"
);
if
(
StringUtils
.
isEmpty
(
treeId
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"treeId不能为空!"
);
}
JsonResult
jsonResult
=
dmpRealtimeSyncInfoService
.
queryRealtimeTaskInfoByTreeId
(
treeId
);
return
jsonResult
;
}
/**
/**
* 批量上下线
* 批量上下线
*
*
...
...
src/main/java/com/jz/dmp/modules/dao/DmpRealtimeSyncInfoDao.java
View file @
c6fb901b
...
@@ -204,4 +204,13 @@ public interface DmpRealtimeSyncInfoDao {
...
@@ -204,4 +204,13 @@ public interface DmpRealtimeSyncInfoDao {
* @since 2021-02-02
* @since 2021-02-02
*/
*/
Map
<
String
,
Object
>
queryTaskStatus
(
String
projectId
)
throws
Exception
;
Map
<
String
,
Object
>
queryTaskStatus
(
String
projectId
)
throws
Exception
;
/**
* 通过treeId 查询实时任务
*
* @return
* @author Bellamy
* @since 2021-01-12
*/
Map
queryRealtimeTaskInfoByTreeId
(
String
treeId
)
throws
Exception
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
View file @
c6fb901b
...
@@ -162,4 +162,13 @@ public interface DmpRealtimeSyncInfoService {
...
@@ -162,4 +162,13 @@ public interface DmpRealtimeSyncInfoService {
* @since 2021-01-05
* @since 2021-01-05
*/
*/
JsonResult
executeRealtimeTask
(
String
realTaskId
,
String
type
,
String
projectId
)
throws
Exception
;
JsonResult
executeRealtimeTask
(
String
realTaskId
,
String
type
,
String
projectId
)
throws
Exception
;
/**
* 通过treeId 查询实时任务
*
* @return
* @author Bellamy
* @since 2021-01-12
*/
JsonResult
queryRealtimeTaskInfoByTreeId
(
String
treeId
)
throws
Exception
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
View file @
c6fb901b
...
@@ -442,8 +442,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
...
@@ -442,8 +442,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
DmpRealtimeSyncInfo
realtimeTask
=
dmpRealtimeSyncInfoDao
.
queryById
(
Integer
.
valueOf
(
params
.
get
(
"taskId"
).
toString
()));
DmpRealtimeSyncInfo
realtimeTask
=
dmpRealtimeSyncInfoDao
.
queryById
(
Integer
.
valueOf
(
params
.
get
(
"taskId"
).
toString
()));
if
(
realtimeTask
==
null
)
if
(
realtimeTask
==
null
)
throw
new
RuntimeException
(
"操作数据不存在!"
);
throw
new
RuntimeException
(
"操作数据不存在!"
);
if
(
StringUtils
.
isEmpty
(
realtimeTask
.
getConnectorJobId
()))
throw
new
RuntimeException
(
"操作数据不存在!"
);
//编辑时,先删除任务,再发布任务
//编辑时,先删除任务,再发布任务
HttpClientUtils
.
httpDelete
(
connectorUrl
+
"/"
+
realtimeTask
.
getConnectorJobId
()
+
"/"
);
HttpClientUtils
.
httpDelete
(
connectorUrl
+
"/"
+
realtimeTask
.
getConnectorJobId
()
+
"/"
);
...
@@ -856,22 +854,24 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
...
@@ -856,22 +854,24 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
List
<
Map
>
list
=
dmpRealtimeSyncInfoDao
.
selectRealtimeTaskById
(
taskId
);
List
<
Map
>
list
=
dmpRealtimeSyncInfoDao
.
selectRealtimeTaskById
(
taskId
);
if
(
list
.
size
()
>
0
&&
list
!=
null
)
{
if
(
list
.
size
()
>
0
&&
list
!=
null
)
{
Map
realtimeMap
=
list
.
get
(
0
);
Map
realtimeMap
=
list
.
get
(
0
);
Map
map
=
(
Map
)
JSONObject
.
parse
((
String
)
realtimeMap
.
get
(
"scriptJson"
));
returnModel
.
setSrcDataSourceId
(
String
.
valueOf
(
realtimeMap
.
get
(
"srcDatasourceId"
)));
returnModel
.
setSrcDataSourceId
(
String
.
valueOf
(
realtimeMap
.
get
(
"srcDatasourceId"
)));
returnModel
.
setTargetDataSourceId
(
String
.
valueOf
(
realtimeMap
.
get
(
"targetDatasourceId"
)));
returnModel
.
setTargetDataSourceId
(
String
.
valueOf
(
realtimeMap
.
get
(
"targetDatasourceId"
)));
returnModel
.
setTreeId
((
String
)
realtimeMap
.
get
(
"treeId"
));
returnModel
.
setTreeId
((
String
)
realtimeMap
.
get
(
"treeId"
));
returnModel
.
setId
(
realtimeMap
.
get
(
"id"
).
toString
());
returnModel
.
setId
(
realtimeMap
.
get
(
"id"
).
toString
());
returnModel
.
setRegularExpression
((
String
)
map
.
get
(
"regularExpression"
));
Map
map
=
(
Map
)
JSONObject
.
parse
((
String
)
realtimeMap
.
get
(
"scriptJson"
));
if
(
map
.
containsKey
(
"srcDatasourceTypeId"
))
{
if
(
map
!=
null
&&
map
.
size
()
>
0
)
{
returnModel
.
setSrcDatasourceTypeId
(
String
.
valueOf
(
map
.
get
(
"srcDatasourceTypeId"
)));
returnModel
.
setRegularExpression
((
String
)
map
.
get
(
"regularExpression"
));
}
if
(
map
.
containsKey
(
"srcDatasourceTypeId"
))
{
if
(
map
.
containsKey
(
"targetDataSourceTypeId"
))
{
returnModel
.
setSrcDatasourceTypeId
(
String
.
valueOf
(
map
.
get
(
"srcDatasourceTypeId"
)));
returnModel
.
setTargetDataSourceTypeId
(
String
.
valueOf
(
map
.
get
(
"targetDataSourceTypeId"
)));
}
}
if
(
map
.
containsKey
(
"targetDataSourceTypeId"
))
{
if
(
map
.
containsKey
(
"projectId"
))
{
returnModel
.
setTargetDataSourceTypeId
(
String
.
valueOf
(
map
.
get
(
"targetDataSourceTypeId"
)));
returnModel
.
setProjectId
(
String
.
valueOf
(
map
.
get
(
"projectId"
)));
}
if
(
map
.
containsKey
(
"projectId"
))
{
returnModel
.
setProjectId
(
String
.
valueOf
(
map
.
get
(
"projectId"
)));
}
returnModel
.
setTables
((
List
<
Map
>)
map
.
get
(
"tables"
));
}
}
returnModel
.
setTables
((
List
<
Map
>)
map
.
get
(
"tables"
));
}
}
return
new
JsonResult
(
returnModel
);
return
new
JsonResult
(
returnModel
);
}
}
...
@@ -963,6 +963,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
...
@@ -963,6 +963,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
for
(
String
table
:
tablesName
)
{
for
(
String
table
:
tablesName
)
{
Map
map
=
new
HashMap
();
Map
map
=
new
HashMap
();
map
.
put
(
"tableName"
,
table
);
map
.
put
(
"tableName"
,
table
);
map
.
put
(
"sourceTableName"
,
table
);
map
.
put
(
"targetTableName"
,
table
);
if
(
tableList
.
contains
(
table
))
{
if
(
tableList
.
contains
(
table
))
{
map
.
put
(
"conflict"
,
"Y"
);
map
.
put
(
"conflict"
,
"Y"
);
}
else
{
}
else
{
...
@@ -1030,6 +1032,19 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
...
@@ -1030,6 +1032,19 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
return
JsonResult
.
ok
();
return
JsonResult
.
ok
();
}
}
/**
* 通过treeId 查询实时任务
*
* @return
* @author Bellamy
* @since 2021-01-12
*/
@Override
public
JsonResult
queryRealtimeTaskInfoByTreeId
(
String
treeId
)
throws
Exception
{
Map
map
=
dmpRealtimeSyncInfoDao
.
queryRealtimeTaskInfoByTreeId
(
treeId
);
return
JsonResult
.
ok
(
map
);
}
/*
/*
* kafka rest api 获取任务状态
* kafka rest api 获取任务状态
* */
* */
...
@@ -1077,7 +1092,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
...
@@ -1077,7 +1092,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
List
<
Map
>
tasks
=
(
List
<
Map
>)
map
.
get
(
"tasks"
);
List
<
Map
>
tasks
=
(
List
<
Map
>)
map
.
get
(
"tasks"
);
if
(
tasks
.
size
()
==
0
)
{
if
(
tasks
.
size
()
==
0
)
{
String
s
=
getKafkaTaskStatus
(
statusUrl
,
params
);
String
s
=
getKafkaTaskStatus
(
statusUrl
,
params
);
if
(
StringUtils
.
isNotEmpty
(
s
))
{
if
(
StringUtils
.
isNotEmpty
(
s
))
{
return
s
;
return
s
;
}
}
}
}
...
...
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
c6fb901b
...
@@ -139,6 +139,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -139,6 +139,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
*/
*/
@Override
@Override
public
JsonResult
querygSourceTableList
(
Long
sourceDbId
,
String
targetName
)
throws
Exception
{
public
JsonResult
querygSourceTableList
(
Long
sourceDbId
,
String
targetName
)
throws
Exception
{
if
(
sourceDbId
==
null
)
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"数据源id不能为空"
);
}
//通过源数据库id ,查询数据源配置
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo
dsInfo
=
offlineSynchDao
.
querySourceDbInfoBySourceId
(
sourceDbId
);
DmpAgentDatasourceInfo
dsInfo
=
offlineSynchDao
.
querySourceDbInfoBySourceId
(
sourceDbId
);
if
(
dsInfo
==
null
)
{
if
(
dsInfo
==
null
)
{
...
@@ -929,6 +932,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -929,6 +932,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTask
.
setCreateUserId
(
SessionUtils
.
getCurrentUserId
());
dmpDevelopTask
.
setCreateUserId
(
SessionUtils
.
getCurrentUserId
());
dmpDevelopTask
.
setDataStatus
(
DelFlagEnum
.
NO
.
getValue
());
dmpDevelopTask
.
setDataStatus
(
DelFlagEnum
.
NO
.
getValue
());
dmpDevelopTask
.
setName
(
tree
.
getName
());
dmpDevelopTask
.
setName
(
tree
.
getName
());
dmpDevelopTask
.
setTaskType
(
"2"
);
dmpDevelopTaskDao
.
insert
(
dmpDevelopTask
);
dmpDevelopTaskDao
.
insert
(
dmpDevelopTask
);
return
JsonResult
.
ok
(
dmpDevelopTask
);
return
JsonResult
.
ok
(
dmpDevelopTask
);
}
else
if
(
TaskTreeTypeEnum
.
SSTB
.
getValue
().
equals
(
newSynchTaskReq
.
getType
()))
{
}
else
if
(
TaskTreeTypeEnum
.
SSTB
.
getValue
().
equals
(
newSynchTaskReq
.
getType
()))
{
...
...
src/main/resources/application-test.yml
View file @
c6fb901b
# 测试环境配置
# 测试环境配置
server
:
server
:
port
:
718
1
port
:
718
3
#contextPath: /resource
#contextPath: /resource
management
:
management
:
...
@@ -101,3 +101,8 @@ dmp:
...
@@ -101,3 +101,8 @@ dmp:
logging
:
logging
:
level
:
level
:
com.jz.dmp
:
debug
com.jz.dmp
:
debug
ftp
:
url
:
192.168.1.141
port
:
21
username
:
ftpuser
password
:
9zDatacn
\ No newline at end of file
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
View file @
c6fb901b
...
@@ -377,7 +377,7 @@
...
@@ -377,7 +377,7 @@
</select>
</select>
<!--根据数据源id获取数据信息-->
<!--根据数据源id获取数据信息-->
<select
id=
"querygSourceDbInfoById"
parameterType=
"map"
resultType=
"com.jz.dmp.modules.model.RealTimeSyncDataSourceModel"
>
<select
id=
"querygSourceDbInfoById"
parameterType=
"map"
flushCache=
"true"
resultType=
"com.jz.dmp.modules.model.RealTimeSyncDataSourceModel"
>
select
select
ds.id as id,
ds.id as id,
ds.datasource_name as datasourceName,
ds.datasource_name as datasourceName,
...
@@ -559,4 +559,17 @@
...
@@ -559,4 +559,17 @@
where 1=1 and t1.data_status='1' and t2.data_status='1'
where 1=1 and t1.data_status='1' and t2.data_status='1'
and t1.project_id =#{projectId}
and t1.project_id =#{projectId}
</select>
</select>
<select
id=
"queryRealtimeTaskInfoByTreeId"
resultType=
"java.util.Map"
>
select
t1.id,
t2.id as treeId,
t2.name,
t2.type as treeType
from
dmp_realtime_sync_info t1
inner join dmp_navigation_tree t2 on t1.tree_id=t2.id
where 1=1 and t1.data_status='1' and t2.data_status='1'
and t2.id =#{treeId}
</select>
</mapper>
</mapper>
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment