Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
02687f62
Commit
02687f62
authored
Feb 02, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
commit
parent
778d7deb
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
199 additions
and
33 deletions
+199
-33
RealTimeSyncController.java
...es/controller/DataIntegration/RealTimeSyncController.java
+16
-7
DmpRealtimeSyncInfo.java
...in/java/com/jz/dmp/modules/model/DmpRealtimeSyncInfo.java
+12
-0
DmpRealtimeSyncInfoServiceImpl.java
.../modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
+2
-1
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+17
-22
DmpRealtimeSyncInfoMapper.xml
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
+6
-2
lxTaskJson.json
src/main/resources/templates/lxTaskJson.json
+146
-1
No files found.
src/main/java/com/jz/dmp/modules/controller/DataIntegration/RealTimeSyncController.java
View file @
02687f62
...
...
@@ -230,8 +230,17 @@ public class RealTimeSyncController {
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"业务节点id不能为空!"
);
}
JsonResult
result
=
new
JsonResult
();
try
{
result
=
dmpRealtimeSyncInfoService
.
addRealTimeTask
(
params
);
}
catch
(
Exception
e
)
{
result
.
setMessage
(
e
.
getMessage
());
result
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
}
//异步提交
Thread
thread
=
new
Thread
(
new
Runnable
()
{
/*
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
...
...
@@ -241,8 +250,8 @@ public class RealTimeSyncController {
}
}
});
thread
.
start
();
return
new
JsonResult
()
;
thread.start();
*/
return
result
;
}
/**
...
...
@@ -312,9 +321,9 @@ public class RealTimeSyncController {
*/
@ApiOperation
(
value
=
"批量上下线"
,
notes
=
"批量上下线"
)
@GetMapping
(
value
=
"/batchUptOnlineStatus"
)
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"realTaskId"
,
value
=
"任务id"
,
required
=
true
),
@ApiImplicitParam
(
name
=
"onlineStatus"
,
value
=
"上下线状态:Y 上线,N 下线"
,
required
=
true
)})
public
JsonResult
batchUptOnlineStatus
(
@RequestParam
String
realTaskId
,
@RequestParam
String
onlineStatus
)
throws
Exception
{
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"realTaskId"
,
value
=
"任务id"
,
required
=
true
),
@ApiImplicitParam
(
name
=
"onlineStatus"
,
value
=
"上下线状态:Y 上线,N 下线"
,
required
=
true
)})
public
JsonResult
batchUptOnlineStatus
(
@RequestParam
String
realTaskId
,
@RequestParam
String
onlineStatus
)
throws
Exception
{
logger
.
info
(
"###################请求参数{}taskId="
+
realTaskId
+
"###################"
);
if
(
StringUtils
.
isEmpty
(
realTaskId
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务id不能为空!"
);
...
...
@@ -322,7 +331,7 @@ public class RealTimeSyncController {
if
(
StringUtils
.
isEmpty
(
onlineStatus
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"状态不能为空!"
);
}
boolean
jsonResult
=
dmpRealtimeSyncInfoService
.
batchUptOnlineStatus
(
realTaskId
,
onlineStatus
);
boolean
jsonResult
=
dmpRealtimeSyncInfoService
.
batchUptOnlineStatus
(
realTaskId
,
onlineStatus
);
if
(
jsonResult
)
{
return
JsonResult
.
ok
();
}
else
{
...
...
src/main/java/com/jz/dmp/modules/model/DmpRealtimeSyncInfo.java
View file @
02687f62
...
...
@@ -151,6 +151,11 @@ public class DmpRealtimeSyncInfo implements Serializable {
@ApiModelProperty
(
value
=
"更新人"
)
private
String
uptPerson
;
/**
* 版本号
*/
@ApiModelProperty
(
value
=
"版本号"
)
private
String
version
;
public
Integer
getId
()
{
return
id
;
...
...
@@ -384,4 +389,11 @@ public class DmpRealtimeSyncInfo implements Serializable {
this
.
uptPerson
=
uptPerson
;
}
public
String
getVersion
()
{
return
version
;
}
public
void
setVersion
(
String
version
)
{
this
.
version
=
version
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
View file @
02687f62
...
...
@@ -389,6 +389,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
//source kafak topic
String
topic
=
sourceDbInfo
.
getDatasourceName
()
+
"_"
+
sourceName
+
"."
+
sourceDbInfo
.
getDbName
()
+
".databasehistory"
;
dataModelMap
.
put
(
"topic"
,
topic
);
//connector_job_id 连接名称
String
sourceConnectorName
=
"debezium-connector-"
+
sourceDbInfo
.
getDatasourceName
()
+
"_"
+
sourceName
+
"-"
+
sourceDbInfo
.
getDbName
();
//查询同步任务信息表是否存在类型是数据源到数据源,srcDataSourceId,targetDataSourceId一样的信息 如果 存在就不发起请求
...
...
@@ -423,9 +424,9 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody
.
setSourceTypeName
(
sourceDbInfo
.
getDataSourceTypeName
());
saveBody
.
setTargetTypeName
(
"Kafka"
);
saveBody
.
setConnectorJobId
(
connectorJobId
);
saveBody
.
setConnectorJsonData
(
dataSource2DataSourceJsonStr
);
saveBody
.
setCreateTime
(
new
Date
());
saveBody
.
setCrePerson
(
SessionUtils
.
getCurrentUserId
());
saveBody
.
setVersion
(
"1.0"
);
dmpRealtimeSyncInfoDao
.
insert
(
saveBody
);
realtiemId
=
Long
.
valueOf
(
saveBody
.
getId
());
logger
.
info
(
"###################保存实时同步任务--结束 ################"
);
...
...
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
02687f62
...
...
@@ -474,13 +474,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
public
JsonResult
addSyncTask
(
SyncDmpTaskAddReq
syncDmpTaskAddReq
)
throws
Exception
{
JsonResult
jsonResult
=
new
JsonResult
();
List
<
Map
<
String
,
Object
>>
result
=
new
ArrayList
<>();
Map
<
String
,
Object
>
reqParam
=
syncDmpTaskAddReq
.
getParams
();
if
(
reqParam
.
size
()
>
0
&&
reqParam
!=
null
)
{
jsonResult
=
addSyncing
(
reqParam
);
DmpDevelopTask
data
=
(
DmpDevelopTask
)
jsonResult
.
getData
();
//保存规则信息
//saveRuleInfo(result, reqParam, jsonResult, String.valueOf(data.getId()));
}
return
jsonResult
;
}
...
...
@@ -554,11 +550,6 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao
.
update
(
task
);
//更新任务
logger
.
info
(
"################################## 更新任务数据结束 ############################################"
);
/*DmpNavigationTree dmpNavigationTree = new DmpNavigationTree();
dmpNavigationTree.setName(taskName);
dmpNavigationTree.setId(treeId);
dmpNavigationTreeDao.update(dmpNavigationTree); //更新节点 树
logger.info("################################## 更新节点 树 结束 ############################################");*/
//更新规则信息
List
<
DvTaskRuleT
>
list
=
new
ArrayList
<>();
//查询TaskRuleID 集合
...
...
@@ -643,6 +634,10 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task
.
setTargetTableName
(
targetTable
);
task
.
setSourceTableName
(
sourceTableName
);
task
.
setSourceDbName
(
sourceDbName
);
List
<
DvTaskRuleT
>
list
=
new
ArrayList
<>();
//更新规则信息
List
<
Map
>
taskRules
=
(
List
<
Map
>)
body
.
get
(
"taskRules"
);
//判断taskId是否存在,存在编辑,不存在新增
if
(
StringUtils
.
isEmpty
(
taskId
))
{
task
.
setVersion
(
"1.0"
);
task
.
setCreateUserId
(
SessionUtils
.
getCurrentUserId
());
...
...
@@ -650,9 +645,6 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao
.
insert
(
task
);
//新增任务数据
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
logger
.
info
(
"################################## 新增离线任务数据结束 ############################################"
);
//更新规则信息
List
<
DvTaskRuleT
>
list
=
new
ArrayList
<>();
List
<
Map
>
taskRules
=
(
List
<
Map
>)
body
.
get
(
"taskRules"
);
//保存dmp数据校验规则信息
settRuleInfo
(
taskId
,
taskRules
,
list
);
}
else
{
...
...
@@ -666,13 +658,22 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao
.
update
(
task
);
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
logger
.
info
(
"################################## 编辑离线任务数据结束 ############################################"
);
//更新规则信息
//查询TaskRuleID 集合
List
<
Long
>
listRules
=
dvTaskRuleTService
.
getTaskRuleIdsList
(
Integer
.
valueOf
(
taskId
));
if
(
CollectionUtils
.
isNotEmpty
(
listRules
))
{
//批量删除rule信息
dvTaskRuleTService
.
delRulesByIds
(
listRules
);
//保存dmp数据校验规则信息
settRuleInfo
(
String
.
valueOf
(
taskId
),
taskRules
,
list
);
}
}
//保存时提交XML
return
dmpDevelopTaskService
.
submitSyncing
(
task
);
}
/**
* 保存任务历史版本
* 保存
离线
任务历史版本
*
* @param task
* @return
...
...
@@ -686,22 +687,16 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
DmpDevelopTaskHistory
taskHistory
=
new
DmpDevelopTaskHistory
();
taskHistory
.
setTaskId
(
task
.
getId
());
taskHistory
.
setVersion
(
task
.
getVersion
());
//taskHistory.setData(task.getData());
taskHistory
.
setScript
(
task
.
getScript
());
taskHistory
.
setTaskDesc
(
task
.
getTaskDesc
());
taskHistory
.
setTreeId
(
task
.
getTreeId
());
taskHistory
.
setTaskType
(
task
.
getTaskType
());
taskHistory
.
setDatasourceId
(
task
.
getDatasourceId
());
taskHistory
.
setCreateTime
(
task
.
getCreateTime
());
if
(
StringUtils
.
isNotEmpty
(
task
.
getCreateUserId
()))
{
taskHistory
.
setCreateUserId
(
Integer
.
valueOf
(
task
.
getCreateUserId
()));
}
taskHistory
.
setUpdateTime
(
task
.
getUpdateTime
());
if
(
StringUtils
.
isNotEmpty
(
task
.
getUpdateUserId
()))
{
taskHistory
.
setUpdateUserId
(
Integer
.
valueOf
(
task
.
getUpdateUserId
()));
}
taskHistory
.
setCreateTime
(
new
Date
());
taskHistory
.
setCreateUserId
(
Integer
.
valueOf
(
SessionUtils
.
getCurrentUserId
()));
taskHistory
.
setDataStatus
(
DelFlagEnum
.
NO
.
getValue
());
dmpDevelopTaskHistoryMapper
.
insert
(
taskHistory
);
logger
.
info
(
"################################## 新增离线任务版本记录结束 ############################################"
);
return
JsonResult
.
ok
();
}
...
...
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
View file @
02687f62
...
...
@@ -155,8 +155,12 @@
<!--新增所有列-->
<insert
id=
"insert"
keyProperty=
"id"
useGeneratedKeys=
"true"
>
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person)
values (#{srcDatasourceId}, #{targetDatasourceId}, #{srcTableName}, #{targetTableName}, #{type}, #{connectorJobId}, #{connectorJsonData}, #{srcTopicName}, #{projectId}, #{parentId}, #{desensitizationField}, #{arithmetic}, #{pkName}, #{sourceTypeName}, #{targetTypeName}, #{srcDatabaseType}, #{srcDatabaseName}, #{connectorUrl}, #{targetDatabaseType}, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson})
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name
, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type
, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person, version)
values (#{srcDatasourceId}, #{targetDatasourceId}, #{srcTableName}, #{targetTableName}, #{type}, #{connectorJobId}, #{connectorJsonData}, #{srcTopicName}, #{projectId}
, #{parentId}, #{desensitizationField}, #{arithmetic}, #{pkName}, #{sourceTypeName}, #{targetTypeName}, #{srcDatabaseType}, #{srcDatabaseName}, #{connectorUrl}, #{targetDatabaseType}
, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson} ,#{version})
</insert>
<insert
id=
"insertBatch"
keyProperty=
"id"
useGeneratedKeys=
"true"
>
...
...
src/main/resources/templates/lxTaskJson.json
View file @
02687f62
...
...
@@ -224,4 +224,149 @@
}
]
}
}
\ No newline at end of file
}
/*
测试数据
{
"params"
:
{
"version"
:
"1.0"
,
"treeId"
:
617
,
"mode"
:
"0"
,
"projectId"
:
"31"
,
"taskId"
:
""
,
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
"scripts"
:
{
"setting"
:
{
"extract"
:
"incremental"
,
"extractExpression"
:
"where 1=1"
,
"targetInsertMergeOverwrite"
:
"insert"
,
"ftColumn"
:
"1"
,
"ftCount"
:
"1"
,
"separateMax"
:
"1"
,
"separateMin"
:
""
,
"postImportStatement"
:
"12"
,
"preImportStatement"
:
"12"
,
"errorLimitRecord"
:
"21"
,
"maxConcurrency"
:
"2"
,
"executorMemory"
:
"1"
,
"executorCores"
:
"1"
,
"totalExecutorCores"
:
"1"
,
"fieldMapping"
:
""
},
"reader"
:
{
"dbConnection"
:
"mysql_dmp_demo_test"
,
"fileType"
:
""
,
"sourceHdfsPath"
:
""
,
"sourceHdfsFile"
:
""
,
"sourceFtpDir"
:
""
,
"sourceFtpFile"
:
""
,
"sourceSkipFtpFile"
:
""
,
"sourceCsvDelimiter"
:
""
,
"sourceCsvHeader"
:
""
,
"sourceCsvCharset"
:
""
,
"sourceCsvQuote"
:
""
,
"sourceFtpLoadDate"
:
""
,
"registerTableName"
:
"dmp_azkaban_exector_server_config"
,
"dayByDay"
:
"false"
,
"column"
:
[
{
"name"
:
"host"
,
"type"
:
"VARCHAR"
},
{
"name"
:
"port"
,
"type"
:
"VARCHAR"
},
{
"name"
:
"user_name"
,
"type"
:
"VARCHAR"
},
{
"name"
:
"pass_word"
,
"type"
:
"VARCHAR"
}
]
},
"writer"
:
{
"targetDbConnection"
:
"mysql_dmp_demo_test"
,
"targetTable"
:
"dmp_azkaban_exector_server_config"
,
"targetFtpDir"
:
""
,
"targetFtpFile"
:
""
,
"targetCsvDelimiter"
:
""
,
"targetCsvHeader"
:
""
,
"targetCsvCharset"
:
""
,
"targetInsertMergeOverwrite"
:
"insert"
,
"column"
:
[
{
"name"
:
"host"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"1"
,
"isPt"
:
"0"
,
"rules"
:
[
{
"method"
:
""
,
"type"
:
""
}
]
},
{
"name"
:
"port"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"0"
,
"isPt"
:
"1"
,
"rules"
:
[
{
"method"
:
""
,
"type"
:
""
}
]
},
{
"name"
:
"user_name"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"0"
,
"isPt"
:
"0"
,
"rules"
:
[
{
"method"
:
""
,
"type"
:
""
}
]
},
{
"name"
:
"pass_word"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"0"
,
"isPt"
:
"0"
,
"rules"
:
[
{
"method"
:
""
,
"type"
:
""
}
]
}
]
}
},
"taskRules"
:
[
{
"ruleId"
:
""
,
"ruleValue"
:
{
"dv_fields"
:
[
{
"fieldName"
:
""
}
],
"dvTime"
:
{
"timeField"
:
""
,
"timeValue"
:
{
"startTime"
:
""
,
"endTime"
:
""
}
}
}
}
]
}
}
*/
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment