Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
5682aa10
Commit
5682aa10
authored
Mar 05, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
离线修改
parent
e2a8d910
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
103 additions
and
10 deletions
+103
-10
OfflineSynchController.java
...es/controller/DataIntegration/OfflineSynchController.java
+20
-0
OfflineSynchService.java
.../java/com/jz/dmp/modules/service/OfflineSynchService.java
+7
-0
DmpRealtimeSyncInfoServiceImpl.java
.../modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
+17
-10
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+59
-0
No files found.
src/main/java/com/jz/dmp/modules/controller/DataIntegration/OfflineSynchController.java
View file @
5682aa10
...
...
@@ -218,6 +218,26 @@ public class OfflineSynchController {
return
jsonResult
;
}
/**
* sync-获取数据源表字段
*
* @return
* @author Bellamy
*/
@ApiOperation
(
value
=
"sync-获取数据源表字段"
,
notes
=
"sync-获取数据源表字段"
)
@PostMapping
(
value
=
"/getSyncColumns"
)
public
JsonResult
getSyncSoureAndTargetColumns
(
@RequestBody
@Validated
Map
<
String
,
List
<
SynchTableColumnsReq
>>
req
)
throws
Exception
{
JsonResult
jsonResult
=
new
JsonResult
();
try
{
jsonResult
=
offlineSynchService
.
getSyncSoureAndTargetColumns
(
req
);
}
catch
(
Exception
e
)
{
jsonResult
.
setMessage
(
e
.
getMessage
());
jsonResult
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
}
return
jsonResult
;
}
/**
* 校验规则
*
...
...
src/main/java/com/jz/dmp/modules/service/OfflineSynchService.java
View file @
5682aa10
...
...
@@ -131,4 +131,11 @@ public interface OfflineSynchService {
*/
JsonResult
getDataPreview
(
OfflineDataPreview
req
)
throws
Exception
;
/**
* sync-获取数据源表字段
*
* @return
* @author Bellamy
*/
JsonResult
getSyncSoureAndTargetColumns
(
Map
<
String
,
List
<
SynchTableColumnsReq
>>
req
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
View file @
5682aa10
...
...
@@ -348,7 +348,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
Integer
srcDataSourceId
=
sourceDbInfo
.
getId
();
Integer
targetDataSourceId
=
targetDbInfo
.
getId
();
logger
.
info
(
"###################开始--同步数据源到数据源任务################### "
);
Long
realtiemId
=
null
;
//同步任务id
//源数据源到数据源同步connector信息
//解析黑名单表
...
...
@@ -428,13 +427,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody
.
setConnectorJobId
(
respData
.
get
(
"connectorJobId"
));
saveBody
.
setStatus
(
respData
.
get
(
"status"
));
dmpRealtimeSyncInfoDao
.
insert
(
saveBody
);
//异步获取任务状态
Thread
thread
=
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
Map
map
=
new
HashMap
();
map
.
put
(
"id"
,
saveBody
.
getId
());
getKafkaTaskStatus
(
connectorUrl
,
map
);
getKafkaTaskStatus
(
connectorUrl
+
respData
.
get
(
"connectorJobId"
)
+
"/status"
,
map
);
}
});
thread
.
start
();
...
...
@@ -460,7 +460,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
version
=
version
.
add
(
new
BigDecimal
(
1.0
));
taskHistory
.
setVersion
(
String
.
valueOf
(
version
));
}
logger
.
info
(
"###################
保存实时同步任务--结束
################"
);
logger
.
info
(
"###################
save task end
################"
);
taskHistory
.
setRealtimeSyncId
(
saveBody
.
getId
());
dmpRealtimeSyncInfoDao
.
insertRealtimeHistory
(
taskHistory
);
...
...
@@ -1040,19 +1040,19 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
Map
map
=
JSONObject
.
parseObject
(
respData
);
String
tasksStatus
=
""
;
List
<
Map
>
tasks
=
(
List
<
Map
>)
map
.
get
(
"tasks"
);
/*List<Map> tasks = (List<Map>) map.get("tasks");
if (tasks.size() > 0 && null != tasks) {
tasksStatus
=
(
String
)
tasks
.
get
(
0
).
get
(
"state"
);
String
tasksStatus = (String) tasks.get(0).get("state");
logger.info("response tasks status{}" + tasksStatus);
return tasksStatus;
}
}
*/
Map
connector
=
(
Map
)
map
.
get
(
"connector"
);
if
(!
connector
.
containsKey
(
"state"
))
{
throw
new
RuntimeException
(
"get status failed!"
);
}
String
status
=
(
String
)
connector
.
get
(
"state"
);
logger
.
info
(
"response connector status{}"
+
status
);
if
(
StringUtils
.
isNotEmpty
((
String
)
params
.
get
(
"id"
)))
{
Thread
thread
=
new
Thread
(
new
Runnable
()
{
@Override
...
...
@@ -1065,15 +1065,21 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
return
status
;
}
public
void
getKafkaTaskStatus
(
String
statusUrl
,
Map
params
)
{
String
respData
=
HttpClientUtils
.
getJsonForParam
(
statusUrl
,
params
);
/*
* kafka rest api 获取任务状态
* */
public
String
getKafkaTaskStatus
(
String
statusUrl
,
Map
params
)
{
String
respData
=
HttpClientUtils
.
getJsonForParam
(
statusUrl
,
null
);
if
(
StringUtils
.
isEmpty
(
respData
))
{
throw
new
RuntimeException
(
"get status failed!"
);
}
Map
map
=
JSONObject
.
parseObject
(
respData
);
List
<
Map
>
tasks
=
(
List
<
Map
>)
map
.
get
(
"tasks"
);
if
(
tasks
.
size
()
==
0
)
{
getKafkaTaskStatus
(
statusUrl
,
params
);
String
s
=
getKafkaTaskStatus
(
statusUrl
,
params
);
if
(
StringUtils
.
isNotEmpty
(
s
)){
return
s
;
}
}
String
status
=
(
String
)
tasks
.
get
(
0
).
get
(
"state"
);
logger
.
info
(
"response tasks status{}"
+
status
);
...
...
@@ -1081,5 +1087,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody
.
setStatus
(
status
);
saveBody
.
setId
(
Integer
.
valueOf
(
params
.
get
(
"id"
).
toString
()));
dmpRealtimeSyncInfoDao
.
update
(
saveBody
);
return
status
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
5682aa10
...
...
@@ -971,4 +971,63 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
return
JsonResult
.
ok
(
returnData
);
}
/**
* sync-获取数据源表字段
*
* @param req
* @return
* @author Bellamy
*/
@Override
public
JsonResult
getSyncSoureAndTargetColumns
(
Map
<
String
,
List
<
SynchTableColumnsReq
>>
req
)
throws
Exception
{
List
<
Map
>
returnData
=
new
ArrayList
<>();
List
<
Map
>
syncData
=
new
ArrayList
<>();
int
len
=
0
;
if
(
req
.
size
()
==
0
||
req
==
null
)
{
throw
new
RuntimeException
(
"请求参数不能为空!"
);
}
List
<
SynchTableColumnsReq
>
list
=
req
.
get
(
"params"
);
Long
id
=
list
.
get
(
0
).
getSourceDbId
();
for
(
SynchTableColumnsReq
str
:
list
)
{
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo
dsInfos
=
offlineSynchDao
.
querySourceDbInfoBySourceId
(
str
.
getSourceDbId
());
DmpAgentDatasourceInfo
dsInfo
=
new
DmpAgentDatasourceInfo
();
BeanUtils
.
copyProperties
(
dsInfos
,
dsInfo
);
if
(
dsInfo
==
null
)
{
throw
new
RuntimeException
(
"数据源配置信息不存在!"
);
}
//解码源数据库密码
if
(
StringUtils
.
isNotBlank
(
dsInfo
.
getPassword
()))
{
dsInfo
.
setPassword
(
new
BaseService
().
decode
(
dsInfo
.
getPassword
(),
publicKey
));
}
//创建jdbc,获取数据源表字段
DmpAgentResult
rst
=
dmpDsAgentServiceImp
.
getTableColumnList
(
dsInfo
,
str
.
getTargetTableName
());
if
(!
rst
.
getCode
().
val
().
equals
(
"200"
))
{
return
new
JsonResult
(
rst
.
getCode
(),
rst
.
getMessage
());
}
else
{
//成功
List
<
Map
>
returnList
=
(
List
<
Map
>)
JsonMapper
.
fromJsonString
(
rst
.
getMessage
(),
List
.
class
);
if
(
returnList
!=
null
&&
returnList
.
size
()
>
0
)
{
for
(
int
i
=
0
;
i
<
returnList
.
size
();
i
++)
{
Map
map
=
returnList
.
get
(
i
);
//离线同步
map
.
put
(
"id"
,
++
len
);
map
.
put
(
"fieldAlias"
,
map
.
get
(
"name"
));
//字段别名
map
.
put
(
"isPk"
,
0
);
//主键
map
.
put
(
"isPt"
,
0
);
//分区
returnData
.
add
(
map
);
if
(
id
==
str
.
getSourceDbId
()){
syncData
.
add
(
map
);
}
}
}
}
}
Map
data
=
new
HashMap
();
data
.
put
(
"allTable"
,
returnData
);
data
.
put
(
"firstTable"
,
returnData
);
return
JsonResult
.
ok
(
data
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment