Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
2c761923
Commit
2c761923
authored
Feb 03, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
commit
parent
52021917
Changes
11
Hide whitespace changes
Inline
Side-by-side
Showing
11 changed files
with
105 additions
and
42 deletions
+105
-42
JsonResult.java
src/main/java/com/jz/common/constant/JsonResult.java
+7
-0
OfflineSynchController.java
...es/controller/DataIntegration/OfflineSynchController.java
+2
-1
RealTimeSyncController.java
...es/controller/DataIntegration/RealTimeSyncController.java
+1
-6
DmpNavigationTreeDao.java
...ain/java/com/jz/dmp/modules/dao/DmpNavigationTreeDao.java
+10
-0
DmpRealtimeSyncInfo.java
...in/java/com/jz/dmp/modules/model/DmpRealtimeSyncInfo.java
+14
-0
DmpRealtimeSyncInfoService.java
...om/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
+1
-1
OfflineSynchService.java
.../java/com/jz/dmp/modules/service/OfflineSynchService.java
+2
-1
DmpRealtimeSyncInfoServiceImpl.java
.../modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
+43
-20
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+13
-12
DmpNavigationTreeMapper.xml
src/main/resources/mapper/dmp/DmpNavigationTreeMapper.xml
+10
-0
DmpRealtimeSyncInfoMapper.xml
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
+2
-1
No files found.
src/main/java/com/jz/common/constant/JsonResult.java
View file @
2c761923
...
...
@@ -110,6 +110,13 @@ public class JsonResult<T> implements Serializable {
return
result
;
}
public
static
JsonResult
<
Object
>
error
(
ResultCode
code
)
{
JsonResult
<
Object
>
result
=
new
JsonResult
<>();
result
.
setCode
(
code
);
result
.
setMessage
(
code
.
msg
());
return
result
;
}
public
void
setCodes
(
com
.
jz
.
dmp
.
agent
.
ResultCode
code
)
{
this
.
code
=
code
.
val
();
}
...
...
src/main/java/com/jz/dmp/modules/controller/DataIntegration/OfflineSynchController.java
View file @
2c761923
...
...
@@ -145,10 +145,11 @@ public class OfflineSynchController {
}
/**
* 根据taskId删除离线任务
* 根据taskId删除离线任务
和任务树
*
* @return
* @author Bellamy
* @Date 2021/02/03
*/
@ApiOperation
(
value
=
"删除任务"
,
notes
=
"删除任务"
)
@GetMapping
(
value
=
"/delTaskByTaskId"
)
...
...
src/main/java/com/jz/dmp/modules/controller/DataIntegration/RealTimeSyncController.java
View file @
2c761923
...
...
@@ -113,12 +113,7 @@ public class RealTimeSyncController {
if
(
StringUtils
.
isEmpty
(
realTaskId
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务id不能为空!"
);
}
boolean
jsonResult
=
dmpRealtimeSyncInfoService
.
deleteByrealTaskId
(
realTaskId
);
if
(
jsonResult
)
{
return
new
JsonResult
();
}
else
{
return
new
JsonResult
(
ResultCode
.
INTERNAL_SERVER_ERROR
,
"删除失败!"
);
}
return
dmpRealtimeSyncInfoService
.
deleteByrealTaskId
(
realTaskId
);
}
/**
...
...
src/main/java/com/jz/dmp/modules/dao/DmpNavigationTreeDao.java
View file @
2c761923
...
...
@@ -4,6 +4,7 @@ import com.jz.dmp.modules.model.DmpNavigationTree;
import
org.apache.ibatis.annotations.Param
;
import
java.util.List
;
import
java.util.Map
;
/**
* DMP资源导航树(DmpNavigationTree)表数据库访问层
...
...
@@ -112,4 +113,13 @@ public interface DmpNavigationTreeDao {
* @throws Exception
*/
public
int
insertSelective
(
DmpNavigationTree
dmpNavigationTree
)
throws
Exception
;
/**
* 假删除任务树
*
* @param params 查询起始位置
* @return
* @Date 2021/02/03
*/
int
deleteByTreeId
(
Map
params
)
throws
Exception
;
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/model/DmpRealtimeSyncInfo.java
View file @
2c761923
...
...
@@ -157,6 +157,12 @@ public class DmpRealtimeSyncInfo implements Serializable {
@ApiModelProperty
(
value
=
"版本号"
)
private
String
version
;
/**
* treeId
*/
@ApiModelProperty
(
value
=
"treeId"
)
private
String
treeId
;
public
Integer
getId
()
{
return
id
;
}
...
...
@@ -396,4 +402,12 @@ public class DmpRealtimeSyncInfo implements Serializable {
public
void
setVersion
(
String
version
)
{
this
.
version
=
version
;
}
public
String
getTreeId
()
{
return
treeId
;
}
public
void
setTreeId
(
String
treeId
)
{
this
.
treeId
=
treeId
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
View file @
2c761923
...
...
@@ -134,7 +134,7 @@ public interface DmpRealtimeSyncInfoService {
* @return
* @author Bellamy
*/
boolean
deleteByrealTaskId
(
String
realTaskId
)
throws
Exception
;
JsonResult
deleteByrealTaskId
(
String
realTaskId
)
throws
Exception
;
/**
* 批量上下线
...
...
src/main/java/com/jz/dmp/modules/service/OfflineSynchService.java
View file @
2c761923
...
...
@@ -51,10 +51,11 @@ public interface OfflineSynchService {
JsonResult
taskRunNowByTaskId
(
String
taskId
)
throws
Exception
;
/**
* 根据taskId删除离线任务
* 根据taskId删除离线任务
和任务树
*
* @return
* @author Bellamy
* @Date 2021/02/03
*/
JsonResult
delTaskByTaskId
(
String
taskId
)
throws
Exception
;
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
View file @
2c761923
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
com.amazonaws.services.dynamodbv2.xspec.M
;
import
com.github.pagehelper.PageHelper
;
import
com.github.pagehelper.PageInfo
;
import
com.jz.common.constant.JsonResult
;
...
...
@@ -13,6 +12,7 @@ import com.jz.common.utils.realTime.DBUtil;
import
com.jz.common.utils.realTime.RestClient
;
import
com.jz.common.utils.web.SessionUtils
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.*
;
import
com.jz.dmp.modules.dao.DmpNavigationTreeDao
;
import
com.jz.dmp.modules.dao.DmpProjectDao
;
import
com.jz.dmp.modules.dao.DmpRealtimeSyncHandleCountDao
;
import
com.jz.dmp.modules.dao.DmpRealtimeSyncInfoDao
;
...
...
@@ -67,6 +67,9 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
@Autowired
private
DmpRealtimeSyncHandleCountDao
dmpRealtimeSyncHandleCountDao
;
@Autowired
private
DmpNavigationTreeDao
dmpNavigationTreeDao
;
/**
* 通过ID查询单条数据
*
...
...
@@ -308,18 +311,19 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
params
.
put
(
"connectorSecurityFlag"
,
dmpProjectSystemInfo
.
getKerberosIsenable
());
//安全验证开关,是否启用KERBEROS
//connect1@http://172.18.104.130:9993/connectors
if
(
StringUtils
.
is
NotEmpty
(
connectorUrl
))
{
if
(
connectorUrl
.
contains
(
"@"
))
{
connectorUrl
=
connectorUrl
.
split
(
"@"
)[
1
];
}
if
(
StringUtils
.
is
Empty
(
connectorUrl
))
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"connectorUrl不能为空!"
);
if
(
connectorUrl
.
contains
(
"@"
))
{
connectorUrl
=
connectorUrl
.
split
(
"@"
)[
1
];
}
//提交源到源的connector
Long
realtimeId
=
submitDatasource2DatasourceToConnector
(
projectId
,
sourceDbInfo
,
targetDbInfo
,
dmpProjectSystemInfo
,
connectorUrl
,
params
);
if
(
realtimeId
!=
null
)
{
//处理已选择的表信息
submitNoSelectTable
(
realtimeId
,
projectId
,
sourceDbInfo
,
targetDbInfo
,
dmpProjectSystemInfo
,
connectorUrl
,
params
);
if
(
realtimeId
==
null
)
{
throw
new
RuntimeException
(
"保存失败!"
);
}
return
new
JsonResult
();
//处理已选择的表信息
submitNoSelectTable
(
realtimeId
,
projectId
,
sourceDbInfo
,
targetDbInfo
,
dmpProjectSystemInfo
,
connectorUrl
,
params
);
return
JsonResult
.
ok
();
}
private
Long
submitDatasource2DatasourceToConnector
(
Long
projectId
,
RealTimeSyncDataSourceModel
sourceDbInfo
,
RealTimeSyncDataSourceModel
targetDbInfo
,
DmpProjectSystemInfo
dmpProjectSystemInfo
,
String
connectorUrl
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
...
...
@@ -402,7 +406,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
logger
.
info
(
"###################保存实时同步任务--结束 ################"
);
DmpRealtimeTaskHistory
taskHistory
=
new
DmpRealtimeTaskHistory
();
BeanUtils
.
copyProperties
(
saveBody
,
taskHistory
);
BeanUtils
.
copyProperties
(
saveBody
,
taskHistory
);
taskHistory
.
setRealtimeSyncId
(
saveBody
.
getId
());
dmpRealtimeSyncInfoDao
.
insertRealtimeHistory
(
taskHistory
);
...
...
@@ -604,7 +608,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
}
logger
.
info
(
"###################处理已选择的表信息--结束################"
);
return
new
JsonResult
();
return
JsonResult
.
ok
();
}
/**
...
...
@@ -641,17 +645,18 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
params
.
put
(
"connectorSecurityFlag"
,
dmpProjectSystemInfo
.
getKerberosIsenable
());
//安全验证开关,是否启用KERBEROS
//connect1@http://172.18.104.130:9993/connectors
if
(
StringUtils
.
isNotEmpty
(
connectorUrl
))
{
if
(
connectorUrl
.
contains
(
"@"
))
{
connectorUrl
=
connectorUrl
.
split
(
"@"
)[
1
];
}
if
(
StringUtils
.
isEmpty
(
connectorUrl
))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"connectorUrl不能为空!"
);
}
if
(
connectorUrl
.
contains
(
"@"
))
{
connectorUrl
=
connectorUrl
.
split
(
"@"
)[
1
];
}
//提交源到源的connector
JsonResult
realtimeId
=
updateDatasource2DatasourceToConnector
(
projectId
,
sourceDbInfo
,
targetDbInfo
,
dmpProjectSystemInfo
,
connectorUrl
,
params
);
//编辑 已选择表信息
updateNoSelectTable
(
params
);
return
new
JsonResult
();
this
.
updateNoSelectTable
(
params
);
return
JsonResult
.
ok
();
}
private
JsonResult
updateDatasource2DatasourceToConnector
(
Long
projectId
,
RealTimeSyncDataSourceModel
sourceDbInfo
,
RealTimeSyncDataSourceModel
targetDbInfo
,
DmpProjectSystemInfo
dmpProjectSystemInfo
,
String
connectorUrl
,
Map
<
String
,
Object
>
params
)
throws
Exception
{
...
...
@@ -719,6 +724,13 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dmpRealtimeSyncInfoDao
.
update
(
saveBody
);
logger
.
info
(
"###################修改实时同步任务--结束 ################"
);
DmpRealtimeTaskHistory
taskHistory
=
new
DmpRealtimeTaskHistory
();
BeanUtils
.
copyProperties
(
saveBody
,
taskHistory
);
taskHistory
.
setRealtimeSyncId
(
saveBody
.
getId
());
taskHistory
.
setCrePerson
(
SessionUtils
.
getCurrentUserId
());
taskHistory
.
setCreateTime
(
new
Date
());
dmpRealtimeSyncInfoDao
.
insertRealtimeHistory
(
taskHistory
);
Map
blacklist
=
new
HashMap
();
blacklist
.
put
(
"uptTime"
,
new
Date
());
blacklist
.
put
(
"uptPerson"
,
SessionUtils
.
getCurrentUserId
());
...
...
@@ -800,19 +812,30 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
/**
*
批量删除数据源
*
删除实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-05
*/
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
public
boolean
deleteByrealTaskId
(
String
realTaskId
)
throws
Exception
{
public
JsonResult
deleteByrealTaskId
(
String
realTaskId
)
throws
Exception
{
DmpRealtimeSyncInfo
realtimeTask
=
dmpRealtimeSyncInfoDao
.
queryById
(
Integer
.
valueOf
(
realTaskId
));
if
(
realtimeTask
==
null
)
{
return
JsonResult
.
error
(
ResultCode
.
OPERATION_DATA_NO_EXIST
);
}
Map
map
=
new
HashMap
();
String
[]
ids
=
realTaskId
.
split
(
","
);
map
.
put
(
"ids"
,
ids
);
map
.
put
(
"dataStatus"
,
DelFlagEnum
.
YES
.
getValue
());
return
dmpRealtimeSyncInfoDao
.
deleteByrealTaskId
(
map
)
>
0
;
int
len
=
dmpRealtimeSyncInfoDao
.
deleteByrealTaskId
(
map
);
if
(
len
>
0
)
{
map
.
put
(
"ids"
,
realtimeTask
.
getTreeId
());
dmpNavigationTreeDao
.
deleteByTreeId
(
map
);
}
return
JsonResult
.
ok
();
}
/**
...
...
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
2c761923
...
...
@@ -19,6 +19,7 @@ import com.jz.common.utils.web.XmlUtils;
import
com.jz.dmp.agent.DmpAgentResult
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.*
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution
;
import
com.jz.dmp.modules.controller.bean.DmpNavigationTreeDto
;
import
com.jz.dmp.modules.dao.*
;
import
com.jz.dmp.modules.model.*
;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
...
...
@@ -94,7 +95,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired
private
DmpDevelopTaskHistoryMapper
dmpDevelopTaskHistoryMapper
;
@Autowired
private
RedisTemplate
redisTemplate
;
...
...
@@ -246,29 +247,29 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
}
/**
* 批量删除离线任务
* 批量删除离线任务
和任务树
*
* @author Bellamy
* @Date 2021/02/03
*/
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
public
JsonResult
delTaskByTaskId
(
String
taskId
)
throws
Exception
{
//通过taskId,查询任务和资源是否存在
Map
map
=
dmpDevelopTaskDao
.
getDmpTaskAndTreeInfo
(
taskId
);
if
(
map
==
null
&&
map
.
size
()
==
0
)
{
return
JsonResult
.
error
(
ResultCode
.
OPERATION_DATA_NO_EXIST
);
}
Map
params
=
new
HashMap
();
String
[]
ids
=
taskId
.
split
(
","
);
params
.
put
(
"ids"
,
ids
);
params
.
put
(
"dataStatus"
,
DelFlagEnum
.
YES
.
getValue
());
dmpDevelopTaskDao
.
deleteTaskByTaskId
(
params
);
//通过taskId,查询任务和资源是否存在
/*Map map = dmpDevelopTaskDao.getDmpTaskAndTreeInfo(taskId);
if (map == null) {
return new JsonResult(ResultCode.OPERATION_DATA_NO_EXIST);
}
if (StringUtils.isEmpty(map.get("treeId").toString())) {
return new JsonResult(ResultCode.OPERATION_DATA_NO_EXIST);
}*/
//dmpDevelopTaskDao.deleteNavigationTreeByTreeId(map.get("treeId").toString());
return
new
JsonResult
(
ResultCode
.
SUCCESS
);
params
.
put
(
"ids"
,
map
.
get
(
"treeId"
).
toString
());
dmpNavigationTreeDao
.
deleteByTreeId
(
params
);
return
JsonResult
.
ok
();
}
/**
...
...
src/main/resources/mapper/dmp/DmpNavigationTreeMapper.xml
View file @
2c761923
...
...
@@ -265,6 +265,16 @@
<delete
id=
"deleteById"
>
delete from dmp_navigation_tree where ID = #{id}
</delete>
<!--假删除任务树-->
<delete
id=
"deleteByTreeId"
parameterType=
"java.util.HashMap"
>
update dmp_navigation_tree
<trim
prefix=
"SET"
suffixOverrides=
","
>
<if
test=
"dataStatus != null"
>
data_status = #{dataStatus},
</if>
</trim>
where id = #{ids}
</delete>
<select
id=
"countTreeByName"
parameterType=
"com.jz.dmp.modules.model.DmpNavigationTree"
resultType=
"java.lang.Integer"
>
select
...
...
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
View file @
2c761923
...
...
@@ -4,6 +4,7 @@
<resultMap
type=
"com.jz.dmp.modules.model.DmpRealtimeSyncInfo"
id=
"DmpRealtimeSyncInfoMap"
>
<result
property=
"id"
column=
"id"
jdbcType=
"INTEGER"
/>
<result
property=
"treeId"
column=
"tree_id"
jdbcType=
"VARCHAR"
/>
<result
property=
"srcDatasourceId"
column=
"src_datasource_id"
jdbcType=
"INTEGER"
/>
<result
property=
"targetDatasourceId"
column=
"target_datasource_id"
jdbcType=
"INTEGER"
/>
<result
property=
"srcTableName"
column=
"src_table_name"
jdbcType=
"VARCHAR"
/>
...
...
@@ -37,7 +38,7 @@
<!--查询单个-->
<select
id=
"queryById"
resultMap=
"DmpRealtimeSyncInfoMap"
>
select
id, src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data
id,
tree_id,
src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data
, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name
, src_database_type, src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name
, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment