Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
4f645937
Commit
4f645937
authored
Mar 05, 2021
by
sml
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
冲突解决
parent
3f961945
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
92 additions
and
7 deletions
+92
-7
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+92
-7
No files found.
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
4f645937
...
@@ -372,9 +372,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -372,9 +372,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpTableDao
.
insert
(
table
);
dmpTableDao
.
insert
(
table
);
DmpTableFieldSchema
schema
=
new
DmpTableFieldSchema
();
//项目表字段类型
DmpTableFieldSchema
schema
=
new
DmpTableFieldSchema
();
//项目表字段类型
schema
.
setDsId
(
Integer
.
valueOf
(
dsInfo
.
getDatasourceTypeId
()));
schema
.
setDs
t
Id
(
Integer
.
valueOf
(
dsInfo
.
getDatasourceTypeId
()));
List
<
DmpTableFieldSchema
>
sourceFieldSchema
=
dmpTableFieldSchemaDao
.
queryAll
(
schema
);
//源字段架构
List
<
DmpTableFieldSchema
>
sourceFieldSchema
=
dmpTableFieldSchemaDao
.
queryAll
(
schema
);
//源字段架构
schema
.
setDsId
(
Integer
.
valueOf
(
dsInfo
.
getDatasourceTypeId
()));
schema
.
setDs
t
Id
(
Integer
.
valueOf
(
dsInfo
.
getDatasourceTypeId
()));
List
<
DmpTableFieldSchema
>
targetFieldSchema
=
dmpTableFieldSchemaDao
.
queryAll
(
schema
);
//目标字段架构
List
<
DmpTableFieldSchema
>
targetFieldSchema
=
dmpTableFieldSchemaDao
.
queryAll
(
schema
);
//目标字段架构
if
(
sourceFieldSchema
!=
null
&&
sourceFieldSchema
.
size
()
>
0
)
{
if
(
sourceFieldSchema
!=
null
&&
sourceFieldSchema
.
size
()
>
0
)
{
Map
<
String
,
Integer
>
ftm
=
new
HashMap
<
String
,
Integer
>();
Map
<
String
,
Integer
>
ftm
=
new
HashMap
<
String
,
Integer
>();
...
@@ -553,7 +553,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -553,7 +553,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task
.
setId
(
taskId
);
task
.
setId
(
taskId
);
dmpDevelopTaskDao
.
update
(
task
);
//更新任务
dmpDevelopTaskDao
.
update
(
task
);
//更新任务
logger
.
info
(
"
################################## 更新任务数据结束 ############################################
"
);
logger
.
info
(
"
======== edit sync task end ========
"
);
//更新规则信息
//更新规则信息
List
<
DvTaskRuleT
>
list
=
new
ArrayList
<>();
List
<
DvTaskRuleT
>
list
=
new
ArrayList
<>();
...
@@ -649,7 +649,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -649,7 +649,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task
.
setCreateTime
(
new
Date
());
task
.
setCreateTime
(
new
Date
());
dmpDevelopTaskDao
.
insert
(
task
);
//新增任务数据
dmpDevelopTaskDao
.
insert
(
task
);
//新增任务数据
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
logger
.
info
(
"
################################## 新增离线任务数据结束 ############################################
"
);
logger
.
info
(
"
======== save sync task end ========
"
);
//保存dmp数据校验规则信息
//保存dmp数据校验规则信息
settRuleInfo
(
taskId
,
taskRules
,
list
);
settRuleInfo
(
taskId
,
taskRules
,
list
);
}
else
{
}
else
{
...
@@ -661,8 +661,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -661,8 +661,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task
.
setUpdateUserId
(
SessionUtils
.
getCurrentUserId
());
task
.
setUpdateUserId
(
SessionUtils
.
getCurrentUserId
());
task
.
setId
(
Integer
.
valueOf
(
taskId
));
task
.
setId
(
Integer
.
valueOf
(
taskId
));
dmpDevelopTaskDao
.
update
(
task
);
dmpDevelopTaskDao
.
update
(
task
);
logger
.
info
(
"======== edit sync task end ========"
);
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
logger
.
info
(
"################################## 编辑离线任务数据结束 ############################################"
);
//更新规则信息
//更新规则信息
//查询TaskRuleID 集合
//查询TaskRuleID 集合
List
<
Long
>
listRules
=
dvTaskRuleTService
.
getTaskRuleIdsList
(
Integer
.
valueOf
(
taskId
));
List
<
Long
>
listRules
=
dvTaskRuleTService
.
getTaskRuleIdsList
(
Integer
.
valueOf
(
taskId
));
...
@@ -702,7 +702,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -702,7 +702,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
taskHistory
.
setCreateUserId
(
Integer
.
valueOf
(
SessionUtils
.
getCurrentUserId
()));
taskHistory
.
setCreateUserId
(
Integer
.
valueOf
(
SessionUtils
.
getCurrentUserId
()));
taskHistory
.
setDataStatus
(
DelFlagEnum
.
NO
.
getValue
());
taskHistory
.
setDataStatus
(
DelFlagEnum
.
NO
.
getValue
());
dmpDevelopTaskHistoryMapper
.
insert
(
taskHistory
);
dmpDevelopTaskHistoryMapper
.
insert
(
taskHistory
);
logger
.
info
(
"
################################## 新增离线任务版本记录结束 ############################################
"
);
logger
.
info
(
"
======== save sync task history end ========
"
);
return
JsonResult
.
ok
();
return
JsonResult
.
ok
();
}
}
...
@@ -771,7 +771,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -771,7 +771,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
if
(
req
.
size
()
==
0
||
req
==
null
)
{
if
(
req
.
size
()
==
0
||
req
==
null
)
{
throw
new
RuntimeException
(
"请求参数不能为空!"
);
throw
new
RuntimeException
(
"请求参数不能为空!"
);
}
}
for
(
SynchTableColumnsReq
str
:
req
.
get
(
"params"
))
{
List
<
SynchTableColumnsReq
>
list
=
req
.
get
(
"params"
);
for
(
SynchTableColumnsReq
str
:
list
)
{
//通过源数据库id ,查询数据源配置
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo
dsInfos
=
offlineSynchDao
.
querySourceDbInfoBySourceId
(
str
.
getSourceDbId
());
DmpAgentDatasourceInfo
dsInfos
=
offlineSynchDao
.
querySourceDbInfoBySourceId
(
str
.
getSourceDbId
());
DmpAgentDatasourceInfo
dsInfo
=
new
DmpAgentDatasourceInfo
();
DmpAgentDatasourceInfo
dsInfo
=
new
DmpAgentDatasourceInfo
();
...
@@ -806,9 +807,34 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -806,9 +807,34 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
}
}
}
}
}
}
if
(
StringUtils
.
isNotEmpty
(
list
.
get
(
0
).
getFieldName
()))
{
List
<
Map
>
searchFieldList
=
getFieldName
(
returnData
,
list
.
get
(
0
).
getFieldName
());
return
JsonResult
.
ok
(
searchFieldList
);
}
return
JsonResult
.
ok
(
returnData
);
return
JsonResult
.
ok
(
returnData
);
}
}
/**
* 根据参数搜索表字段
*
* @return
* @author Bellamy
* @since 2021-03-02
*/
public
List
<
Map
>
getFieldName
(
List
<
Map
>
list
,
String
fieldName
)
throws
Exception
{
List
<
Map
>
returnList
=
new
ArrayList
<>();
if
(
list
.
size
()
>
0
&&
null
!=
list
)
{
for
(
Map
str
:
list
)
{
String
name
=
(
String
)
str
.
get
(
"name"
);
if
(
name
.
contains
(
fieldName
))
{
returnList
.
add
(
str
);
}
}
}
return
returnList
;
}
/**
/**
* 任务停止运行
* 任务停止运行
*
*
...
@@ -945,4 +971,63 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -945,4 +971,63 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
return
JsonResult
.
ok
(
returnData
);
return
JsonResult
.
ok
(
returnData
);
}
}
/**
* sync-获取数据源表字段
*
* @param req
* @return
* @author Bellamy
*/
@Override
public
JsonResult
getSyncSoureAndTargetColumns
(
Map
<
String
,
List
<
SynchTableColumnsReq
>>
req
)
throws
Exception
{
List
<
Map
>
returnData
=
new
ArrayList
<>();
List
<
Map
>
syncData
=
new
ArrayList
<>();
int
len
=
0
;
if
(
req
.
size
()
==
0
||
req
==
null
)
{
throw
new
RuntimeException
(
"请求参数不能为空!"
);
}
List
<
SynchTableColumnsReq
>
list
=
req
.
get
(
"params"
);
Long
id
=
list
.
get
(
0
).
getSourceDbId
();
for
(
SynchTableColumnsReq
str
:
list
)
{
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo
dsInfos
=
offlineSynchDao
.
querySourceDbInfoBySourceId
(
str
.
getSourceDbId
());
DmpAgentDatasourceInfo
dsInfo
=
new
DmpAgentDatasourceInfo
();
BeanUtils
.
copyProperties
(
dsInfos
,
dsInfo
);
if
(
dsInfo
==
null
)
{
throw
new
RuntimeException
(
"数据源配置信息不存在!"
);
}
//解码源数据库密码
if
(
StringUtils
.
isNotBlank
(
dsInfo
.
getPassword
()))
{
dsInfo
.
setPassword
(
new
BaseService
().
decode
(
dsInfo
.
getPassword
(),
publicKey
));
}
//创建jdbc,获取数据源表字段
DmpAgentResult
rst
=
dmpDsAgentServiceImp
.
getTableColumnList
(
dsInfo
,
str
.
getTargetTableName
());
if
(!
rst
.
getCode
().
val
().
equals
(
"200"
))
{
return
new
JsonResult
(
rst
.
getCode
(),
rst
.
getMessage
());
}
else
{
//成功
List
<
Map
>
returnList
=
(
List
<
Map
>)
JsonMapper
.
fromJsonString
(
rst
.
getMessage
(),
List
.
class
);
if
(
returnList
!=
null
&&
returnList
.
size
()
>
0
)
{
for
(
int
i
=
0
;
i
<
returnList
.
size
();
i
++)
{
Map
map
=
returnList
.
get
(
i
);
//离线同步
map
.
put
(
"id"
,
++
len
);
map
.
put
(
"fieldAlias"
,
map
.
get
(
"name"
));
//字段别名
map
.
put
(
"isPk"
,
0
);
//主键
map
.
put
(
"isPt"
,
0
);
//分区
returnData
.
add
(
map
);
if
(
id
==
str
.
getSourceDbId
()){
syncData
.
add
(
map
);
}
}
}
}
}
Map
data
=
new
HashMap
();
data
.
put
(
"allTable"
,
returnData
);
data
.
put
(
"firstTable"
,
returnData
);
return
JsonResult
.
ok
(
data
);
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment