Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
c6268406
Commit
c6268406
authored
Jan 11, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
330d5784
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
188 additions
and
53 deletions
+188
-53
RealTimeSyncController.java
...es/controller/DataIntegration/RealTimeSyncController.java
+43
-0
DmpRealtimeSyncInfoService.java
...om/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
+10
-1
DmpRealtimeSyncInfoServiceImpl.java
.../modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
+134
-52
realtime.json
src/main/resources/realtime.json
+1
-0
No files found.
src/main/java/com/jz/dmp/modules/controller/DataIntegration/RealTimeSyncController.java
View file @
c6268406
...
...
@@ -228,6 +228,10 @@ public class RealTimeSyncController {
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"targetDataSourceId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"目标数据源id不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"treeId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"业务节点id不能为空!"
);
}
//异步提交
Thread
thread
=
new
Thread
(
new
Runnable
()
{
@Override
...
...
@@ -243,4 +247,43 @@ public class RealTimeSyncController {
return
new
JsonResult
();
}
/**
* 编辑实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
@ApiOperation
(
value
=
"编辑实时同步任务"
,
notes
=
"编辑实时同步任务"
)
@PostMapping
(
value
=
"/updateTask"
)
public
JsonResult
updateTask
(
@RequestBody
Map
<
String
,
Object
>
params
,
HttpServletRequest
httpRequest
)
throws
Exception
{
logger
.
info
(
"################请求参数{}"
+
params
.
toString
()
+
"############"
);
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"projectId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"项目id不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"srcDataSourceId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"来源数据源id不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"targetDataSourceId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"目标数据源id不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"taskId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务id不能为空!"
);
}
//异步提交
Thread
thread
=
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
try
{
dmpRealtimeSyncInfoService
.
updateRealTimeTask
(
params
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
});
thread
.
start
();
return
new
JsonResult
();
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
View file @
c6268406
...
...
@@ -111,5 +111,14 @@ public interface DmpRealtimeSyncInfoService {
* @author Bellamy
* @since 2021-01-08
*/
boolean
addRealTimeTask
(
Map
<
String
,
Object
>
params
)
throws
Exception
;
JsonResult
addRealTimeTask
(
Map
<
String
,
Object
>
params
)
throws
Exception
;
/**
* 编辑实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
JsonResult
updateRealTimeTask
(
Map
<
String
,
Object
>
params
)
throws
Exception
;
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
View file @
c6268406
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
com.amazonaws.services.dynamodbv2.xspec.M
;
import
com.github.pagehelper.PageHelper
;
import
com.github.pagehelper.PageInfo
;
import
com.jz.common.constant.JsonResult
;
...
...
@@ -282,7 +283,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
* @since 2021-01-08
*/
@Override
public
boolean
addRealTimeTask
(
Map
<
String
,
Object
>
params
)
throws
Exception
{
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
public
JsonResult
addRealTimeTask
(
Map
<
String
,
Object
>
params
)
throws
Exception
{
String
srcDataSourceId
=
params
.
get
(
"srcDataSourceId"
).
toString
();
// 来源id
String
targetDataSourceId
=
params
.
get
(
"targetDataSourceId"
).
toString
();
// 目标源id
Long
projectId
=
Long
.
valueOf
(
params
.
get
(
"projectId"
).
toString
());
//项目id
...
...
@@ -331,7 +333,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
//处理已选择的表信息
submitNoSelectTable
(
realtimeId
,
projectId
,
sourceDbInfo
,
targetDbInfo
,
dmpProjectSystemInfo
,
connectorUrl
,
params
);
}
return
false
;
return
new
JsonResult
()
;
}
private
Long
submitDatasource2DatasourceToConnector
(
Long
projectId
,
RealTimeSyncDataSourceModel
sourceDbInfo
,
RealTimeSyncDataSourceModel
targetDbInfo
,
DmpProjectSystemInfo
dmpProjectSystemInfo
,
String
connectorUrl
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
...
...
@@ -356,64 +358,23 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
logger.info("###################修改实时同步任务处理数--结束{}" + handleCount.toString() + "############");
}*/
logger
.
info
(
"###################
开始--同步数据源到数据源任务################### "
);
logger
.
info
(
"###################开始--同步数据源到数据源任务################### "
);
Long
realtiemId
=
null
;
//同步任务id
//源数据源到数据源同步connector信息
//黑名单表
String
blacklistTables
=
""
;
String
connectorBlacklistTables
=
""
;
int
blacklistTableCount
=
0
;
if
(
params
.
containsKey
(
"blacklistTables"
))
{
blacklistTables
=
(
String
)
params
.
get
(
"blacklistTables"
);
if
(
StringUtils
.
isNotEmpty
(
blacklistTables
))
{
StringBuffer
sb
=
new
StringBuffer
();
String
[]
blacklistTableArr
=
blacklistTables
.
split
(
","
);
blacklistTableCount
=
blacklistTableArr
.
length
;
if
(
blacklistTableArr
!=
null
&&
blacklistTableCount
>=
1
)
{
for
(
String
string
:
blacklistTableArr
)
{
sb
.
append
(
sourceDbInfo
.
getDbName
());
sb
.
append
(
"."
);
sb
.
append
(
string
);
sb
.
append
(
","
);
}
}
if
(
sb
.
toString
().
contains
(
","
))
{
connectorBlacklistTables
=
sb
.
toString
().
substring
(
0
,
sb
.
toString
().
length
()
-
1
);
}
}
}
logger
.
info
(
"################### 解析黑名单表--结束{}"
+
connectorBlacklistTables
+
" ################"
);
//解析黑名单表
Map
blacklistTablesInfo
=
getBlackListTableInfo
(
sourceDbInfo
,
params
);
//解析已选择表
String
connectorWhitelistTables
=
""
;
int
whitelistTablesConut
=
0
;
if
(
params
.
containsKey
(
"tables"
))
{
List
<
Map
<
String
,
String
>>
tables
=
(
List
<
Map
<
String
,
String
>>)
params
.
get
(
"tables"
);
if
(!
CollectionUtils
.
isEmpty
(
tables
))
{
whitelistTablesConut
=
tables
.
size
();
StringBuffer
sb
=
new
StringBuffer
();
for
(
Map
<
String
,
String
>
temp
:
tables
)
{
sb
.
append
(
sourceDbInfo
.
getDbName
());
sb
.
append
(
"."
);
sb
.
append
(
temp
.
get
(
"sourceTableName"
));
sb
.
append
(
","
);
}
if
(
sb
.
toString
().
contains
(
","
))
{
connectorWhitelistTables
=
sb
.
toString
().
substring
(
0
,
sb
.
toString
().
length
()
-
1
);
}
}
}
logger
.
info
(
"################### 解析已选择表--结束{} "
+
connectorWhitelistTables
+
" ################"
);
Map
selectlistTablesInfo
=
getSelectListTableInfo
(
sourceDbInfo
,
params
);
Map
<
String
,
String
>
dataModelMap
=
new
HashMap
<>();
dataModelMap
.
put
(
"kafkaBootstrapServers"
,
dmpProjectSystemInfo
.
getKafkaBootstrapServers
());
//kafka 地址
dataModelMap
.
put
(
"registerUrl"
,
dmpProjectSystemInfo
.
getKafkaSchemaRegisterUrl
());
//kafka connector schema 注册地址
dataModelMap
.
put
(
"blacklistTables"
,
connectorBlacklistTables
);
//设置的黑名单表
dataModelMap
.
put
(
"blacklistTableCount"
,
String
.
valueOf
(
blacklistTableCount
));
//黑名单表数量
dataModelMap
.
put
(
"blacklistTables"
,
blacklistTablesInfo
.
get
(
"connectorBlacklistTables"
).
toString
()
);
//设置的黑名单表
dataModelMap
.
put
(
"blacklistTableCount"
,
blacklistTablesInfo
.
get
(
"blacklistTableCount"
).
toString
(
));
//黑名单表数量
//设置的白名单表 在模板里进行判比较黑名单表和白名单表的数量,谁小就用谁
dataModelMap
.
put
(
"whitelistTablesConut"
,
String
.
valueOf
(
whitelistTablesConut
));
//已选择表数量
dataModelMap
.
put
(
"connectorWhitelistTables"
,
connectorWhitelistTables
);
//设置的已选择表
dataModelMap
.
put
(
"whitelistTablesConut"
,
selectlistTablesInfo
.
get
(
"whitelistTablesConut"
).
toString
(
));
//已选择表数量
dataModelMap
.
put
(
"connectorWhitelistTables"
,
selectlistTablesInfo
.
get
(
"connectorWhitelistTables"
).
toString
()
);
//设置的已选择表
//选择的来源数据信息
dataModelMap
.
put
(
"dbHost"
,
sourceDbInfo
.
getHost
());
dataModelMap
.
put
(
"dbPort"
,
sourceDbInfo
.
getPort
());
...
...
@@ -475,7 +436,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
blacklist
.
put
(
"creTime"
,
new
Date
());
blacklist
.
put
(
"realtimeId"
,
saveBody
.
getId
());
blacklist
.
put
(
"datasourceId"
,
srcDataSourceId
);
blacklist
.
put
(
"blacklistTable"
,
blacklistTables
);
blacklist
.
put
(
"blacklistTable"
,
blacklistTables
Info
.
get
(
"blacklistTables"
).
toString
()
);
dmpRealtimeSyncInfoDao
.
insertRealtimeBlackList
(
blacklist
);
logger
.
info
(
"###################保存实时同步黑名单数据--结束 ################"
);
}
else
{
...
...
@@ -487,6 +448,78 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
return
realtiemId
;
}
/**
* 解析黑名单表
*
* @return
* @author Bellamy
* @since 2021-01-11
*/
public
Map
getBlackListTableInfo
(
RealTimeSyncDataSourceModel
sourceDbInfo
,
Map
<
String
,
Object
>
params
)
{
Map
returnMap
=
new
HashMap
();
String
blacklistTables
=
""
;
String
connectorBlacklistTables
=
""
;
int
blacklistTableCount
=
0
;
if
(
params
.
containsKey
(
"blacklistTables"
))
{
blacklistTables
=
(
String
)
params
.
get
(
"blacklistTables"
);
if
(
StringUtils
.
isNotEmpty
(
blacklistTables
))
{
StringBuffer
sb
=
new
StringBuffer
();
String
[]
blacklistTableArr
=
blacklistTables
.
split
(
","
);
blacklistTableCount
=
blacklistTableArr
.
length
;
if
(
blacklistTableArr
!=
null
&&
blacklistTableCount
>=
1
)
{
for
(
String
string
:
blacklistTableArr
)
{
sb
.
append
(
sourceDbInfo
.
getDbName
());
sb
.
append
(
"."
);
sb
.
append
(
string
);
sb
.
append
(
","
);
}
}
if
(
sb
.
toString
().
contains
(
","
))
{
connectorBlacklistTables
=
sb
.
toString
().
substring
(
0
,
sb
.
toString
().
length
()
-
1
);
}
}
}
returnMap
.
put
(
"blacklistTables"
,
blacklistTables
);
returnMap
.
put
(
"connectorBlacklistTables"
,
connectorBlacklistTables
);
returnMap
.
put
(
"blacklistTableCount"
,
blacklistTableCount
);
logger
.
info
(
"###################解析黑名单表--结束{}"
+
connectorBlacklistTables
+
" ################"
);
return
returnMap
;
}
/**
* 解析已选择表
*
* @return
* @author Bellamy
* @since 2021-01-11
*/
public
Map
getSelectListTableInfo
(
RealTimeSyncDataSourceModel
sourceDbInfo
,
Map
<
String
,
Object
>
params
)
{
Map
returnMap
=
new
HashMap
();
String
connectorWhitelistTables
=
""
;
int
whitelistTablesConut
=
0
;
if
(
params
.
containsKey
(
"tables"
))
{
List
<
Map
<
String
,
String
>>
tables
=
(
List
<
Map
<
String
,
String
>>)
params
.
get
(
"tables"
);
if
(!
CollectionUtils
.
isEmpty
(
tables
))
{
whitelistTablesConut
=
tables
.
size
();
StringBuffer
sb
=
new
StringBuffer
();
for
(
Map
<
String
,
String
>
temp
:
tables
)
{
sb
.
append
(
sourceDbInfo
.
getDbName
());
sb
.
append
(
"."
);
sb
.
append
(
temp
.
get
(
"sourceTableName"
));
sb
.
append
(
","
);
}
if
(
sb
.
toString
().
contains
(
","
))
{
connectorWhitelistTables
=
sb
.
toString
().
substring
(
0
,
sb
.
toString
().
length
()
-
1
);
}
}
}
returnMap
.
put
(
"connectorWhitelistTables"
,
connectorWhitelistTables
);
returnMap
.
put
(
"whitelistTablesConut"
,
whitelistTablesConut
);
logger
.
info
(
"###################解析已选择表--结束{} "
+
connectorWhitelistTables
+
"################"
);
return
returnMap
;
}
/**
* 使用freemaker模板生成 kafka connector 请求参数
*
...
...
@@ -598,4 +631,53 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
logger
.
info
(
"###################处理已选择的表信息--结束################"
);
return
new
JsonResult
();
}
/**
* 编辑实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
@Override
public
JsonResult
updateRealTimeTask
(
Map
<
String
,
Object
>
params
)
throws
Exception
{
String
srcDataSourceId
=
params
.
get
(
"srcDataSourceId"
).
toString
();
//来源id
String
targetDataSourceId
=
params
.
get
(
"targetDataSourceId"
).
toString
();
//目标源id
Long
projectId
=
Long
.
valueOf
(
params
.
get
(
"projectId"
).
toString
());
//项目id
String
connectorUrl
=
(
String
)
params
.
get
(
"connectorUrl"
);
//connectorUrl连接信息
//根据来源数据源id获取数据信息
RealTimeSyncDataSourceModel
sourceDbInfo
=
dmpRealtimeSyncInfoDao
.
querygSourceDbInfoById
(
srcDataSourceId
);
if
(
sourceDbInfo
==
null
)
{
throw
new
RuntimeException
(
"来源数据设置信息不存在!"
);
}
sourceDbInfo
.
setPassword
(
new
BaseService
().
decode
(
sourceDbInfo
.
getPassword
(),
publicKey
));
//根据目标数据源id获取数据信息
RealTimeSyncDataSourceModel
targetDbInfo
=
dmpRealtimeSyncInfoDao
.
querygSourceDbInfoById
(
targetDataSourceId
);
if
(
targetDbInfo
==
null
)
{
throw
new
RuntimeException
(
"目标数据源设置信息不存在!"
);
}
targetDbInfo
.
setPassword
(
new
BaseService
().
decode
(
targetDbInfo
.
getPassword
(),
publicKey
));
//根据projectId查询项目系统配置信息
DmpProjectSystemInfo
dmpProjectSystemInfo
=
dmpProjectDao
.
queryProjectSystemInfo
(
projectId
);
params
.
put
(
"connectorSecurityFlag"
,
dmpProjectSystemInfo
.
getKerberosIsenable
());
//安全验证开关,是否启用KERBEROS
//connect1@http://172.18.104.130:9993/connectors
if
(
StringUtils
.
isNotEmpty
(
connectorUrl
))
{
if
(
connectorUrl
.
contains
(
"@"
))
{
connectorUrl
=
connectorUrl
.
split
(
"@"
)[
1
];
}
}
//提交源到源的connector
Long
realtimeId
=
updateDatasource2DatasourceToConnector
(
projectId
,
sourceDbInfo
,
targetDbInfo
,
dmpProjectSystemInfo
,
connectorUrl
,
params
);
return
new
JsonResult
();
}
private
Long
updateDatasource2DatasourceToConnector
(
Long
projectId
,
RealTimeSyncDataSourceModel
sourceDbInfo
,
RealTimeSyncDataSourceModel
targetDbInfo
,
DmpProjectSystemInfo
dmpProjectSystemInfo
,
String
connectorUrl
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
return
null
;
}
}
\ No newline at end of file
src/main/resources/realtime.json
View file @
c6268406
{
"projectId"
:
31
,
"treeId"
:
1
,
"srcDataSourceId"
:
205
,
"targetDataSourceId"
:
202
,
"sourceName"
:
"test34"
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment