Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
62614ff6
Commit
62614ff6
authored
Feb 01, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
commit
parent
dcdc961e
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
143 additions
and
60 deletions
+143
-60
DelFlagEnum.java
src/main/java/com/jz/common/enums/DelFlagEnum.java
+12
-12
OfflineSynchController.java
...es/controller/DataIntegration/OfflineSynchController.java
+8
-2
DmpDevelopTaskDao.java
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
+27
-17
DmpDevelopTaskHistory.java
.../java/com/jz/dmp/modules/model/DmpDevelopTaskHistory.java
+10
-0
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+73
-25
DmpDevelopTaskMapper.xml
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
+8
-0
lxTaskJson.json
src/main/resources/templates/lxTaskJson.json
+5
-4
No files found.
src/main/java/com/jz/common/enums/DelFlagEnum.java
View file @
62614ff6
...
...
@@ -28,6 +28,18 @@ public enum DelFlagEnum {
this
.
value
=
value
;
}
public
static
DelFlagEnum
get
(
String
code
)
{
if
(
code
==
null
)
{
return
null
;
}
for
(
DelFlagEnum
status
:
values
())
{
if
(
status
.
getCode
().
equalsIgnoreCase
(
code
))
{
return
status
;
}
}
return
null
;
}
public
String
getCode
()
{
return
code
;
}
...
...
@@ -43,16 +55,4 @@ public enum DelFlagEnum {
public
void
setValue
(
String
value
)
{
this
.
value
=
value
;
}
public
static
DelFlagEnum
get
(
String
code
)
{
if
(
code
==
null
)
{
return
null
;
}
for
(
DelFlagEnum
status
:
values
())
{
if
(
status
.
getCode
().
equalsIgnoreCase
(
code
))
{
return
status
;
}
}
return
null
;
}
}
src/main/java/com/jz/dmp/modules/controller/DataIntegration/OfflineSynchController.java
View file @
62614ff6
...
...
@@ -5,7 +5,6 @@ import com.jz.common.constant.ResultCode;
import
com.jz.common.page.BasePageBean
;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.*
;
import
com.jz.dmp.modules.controller.dataService.bean.SoureTableColumnsReq
;
import
com.jz.dmp.modules.service.DmpSyncingDatasourceTypeService
;
import
com.jz.dmp.modules.service.OfflineSynchService
;
import
io.swagger.annotations.Api
;
...
...
@@ -244,7 +243,14 @@ public class OfflineSynchController {
@ApiOperation
(
value
=
"保存离线任务数据"
,
notes
=
"保存离线任务数据"
)
@PostMapping
(
value
=
"/addSyncTask"
)
public
JsonResult
addSyncTask
(
@RequestBody
@Validated
SyncDmpTaskAddReq
syncDmpTaskAddReq
)
throws
Exception
{
JsonResult
list
=
offlineSynchService
.
addSyncTask
(
syncDmpTaskAddReq
);
JsonResult
list
=
new
JsonResult
();
try
{
list
=
offlineSynchService
.
addSyncTask
(
syncDmpTaskAddReq
);
}
catch
(
Exception
e
)
{
list
.
setMessage
(
"保存失败!"
);
list
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
}
return
list
;
}
...
...
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
View file @
62614ff6
...
...
@@ -44,22 +44,32 @@ public interface DmpDevelopTaskDao {
List
<
DataDevTaskListDto
>
queryTaskTreeInfo
(
Map
params
)
throws
Exception
;
/**条件查询任务开发
* @param param
* @return
* @throws Exception
*/
public
List
<
DmpDevelopTask
>
findList
(
Map
<
String
,
Object
>
param
)
throws
Exception
;
/**
* @Title: get
* @Description: TODO(主键获取对象)
* @param @param id
* @param @return
* @param @throws Exception 参数
* @return DmpDevelopTask 返回类型
* @throws
*/
public
DmpDevelopTask
get
(
Long
id
)
throws
Exception
;
/**
* 条件查询任务开发
*
* @param param
* @return
* @throws Exception
*/
public
List
<
DmpDevelopTask
>
findList
(
Map
<
String
,
Object
>
param
)
throws
Exception
;
/**
* @param @param id
* @param @return
* @param @throws Exception 参数
* @return DmpDevelopTask 返回类型
* @throws
* @Title: get
* @Description: TODO(主键获取对象)
*/
public
DmpDevelopTask
get
(
Long
id
)
throws
Exception
;
/**
* 根据主键查询
*
* @return
* @author Bellamy
* @since 2021-02-01
*/
DmpDevelopTask
selectTaskById
(
@Param
(
"taskId"
)
String
taskId
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/model/DmpDevelopTaskHistory.java
View file @
62614ff6
...
...
@@ -176,6 +176,8 @@ public class DmpDevelopTaskHistory implements Serializable{
@JsonFormat
(
pattern
=
"yyyy-MM-dd HH:mm:ss"
)
private
Date
updateTime
;
private
byte
[]
data
;
public
Integer
getId
()
{
return
id
;
}
...
...
@@ -375,4 +377,12 @@ public class DmpDevelopTaskHistory implements Serializable{
public
void
setUpdateTime
(
Date
updateTime
)
{
this
.
updateTime
=
updateTime
;
}
public
byte
[]
getData
()
{
return
data
;
}
public
void
setData
(
byte
[]
data
)
{
this
.
data
=
data
;
}
}
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
62614ff6
...
...
@@ -24,6 +24,7 @@ import com.jz.dmp.modules.model.*;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
import
com.jz.dmp.modules.service.DvTaskRuleTService
;
import
com.jz.dmp.modules.service.OfflineSynchService
;
import
net.bytebuddy.implementation.bytecode.Throw
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
...
...
@@ -38,6 +39,7 @@ import org.springframework.transaction.annotation.Transactional;
import
javax.servlet.http.HttpServletRequest
;
import
java.io.File
;
import
java.io.UnsupportedEncodingException
;
import
java.math.BigDecimal
;
import
java.text.SimpleDateFormat
;
import
java.util.*
;
...
...
@@ -90,6 +92,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired
private
DvTaskRuleTService
dvTaskRuleTService
;
@Autowired
private
DmpDevelopTaskHistoryMapper
dmpDevelopTaskHistoryMapper
;
/**
* 离线同步任务列表分页查询
*
...
...
@@ -548,11 +553,11 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao
.
update
(
task
);
//更新任务
logger
.
info
(
"################################## 更新任务数据结束 ############################################"
);
DmpNavigationTree
dmpNavigationTree
=
new
DmpNavigationTree
();
/*
DmpNavigationTree dmpNavigationTree = new DmpNavigationTree();
dmpNavigationTree.setName(taskName);
dmpNavigationTree.setId(treeId);
dmpNavigationTreeDao.update(dmpNavigationTree); //更新节点 树
logger
.
info
(
"################################## 更新节点 树 结束 ############################################"
);
logger.info("################################## 更新节点 树 结束 ############################################");
*/
//更新规则信息
List
<
DvTaskRuleT
>
list
=
new
ArrayList
<>();
//查询TaskRuleID 集合
...
...
@@ -577,30 +582,28 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
* @return
*/
public
JsonResult
addSyncing
(
Map
<
String
,
Object
>
body
)
throws
Exception
{
if
(
StringUtils
.
isEmpty
(
body
.
get
(
"treeId"
).
toString
()))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"treeId不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
body
.
get
(
"projectId"
).
toString
()))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"projectId不能为空!"
);
}
if
(
StringUtils
.
isBlank
(
body
.
get
(
"taskName"
).
toString
()))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
}
Integer
projectId
=
Integer
.
valueOf
(
body
.
get
(
"projectId"
).
toString
());
Integer
parentId
=
Integer
.
valueOf
(
body
.
get
(
"parentId"
).
toString
());
//父节点ID
String
taskName
=
(
String
)
body
.
get
(
"taskName"
);
//任务名称 业务节点名称 一对一
String
taskId
=
body
.
get
(
"taskId"
).
toString
();
//任务ID
Integer
treeId
=
Integer
.
valueOf
(
body
.
get
(
"treeId"
).
toString
());
//树节点ID
//Integer taskId = null; //任务ID
String
taskName
=
body
.
get
(
"taskName"
).
toString
();
//任务名称 业务节点名称 一对一
//Integer parentId = Integer.valueOf(body.get("parentId").toString()); //父节点ID
if
(
StringUtils
.
isBlank
(
taskName
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
}
Map
<
String
,
Object
>
scriptMap
=
(
Map
<
String
,
Object
>)
body
.
get
(
"scripts"
);
//任务json数据
Object
content
=
scriptMap
.
get
(
"content"
);
String
xmlTdb
=
XmlUtils
.
getPropertyValue
(
content
,
"target_db_connection"
);
if
(
null
!=
content
&&
xmlTdb
.
length
()
==
0
)
{
// 包含content但未取出值条件才成立
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"脚本内容中缺失目标数据源(target_db_connection)"
);
}
/*DmpNavigationTree tree = new DmpNavigationTree();
tree.setName(taskName); //树节点名称
tree.setProjectId(projectId);
tree.setCategory("2"); //树类别
tree.setIsLevel("1"); //是否叶子节点 1 是
int cnt = dmpNavigationTreeDao.countTreeByName(tree);
if (cnt > 0) {
throw new RuntimeException("当前项目已存在同名的任务");
}*/
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"脚本内容中缺失目标数据源(target_db_connection)"
);
}
//保存目标库类型
Map
<
String
,
Object
>
writer
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"writer"
);
// 目标数据
...
...
@@ -619,11 +622,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
DmpDevelopTask
task
=
new
DmpDevelopTask
();
task
.
setProjectId
(
projectId
);
//task.setParentId(parentId);
task
.
setTaskType
(
"2"
);
//任务类型
task
.
setDatasourceId
(
dataSourceId
);
//数据源ID
task
.
setType
(
"3"
);
//task.setTaskName(taskName);
task
.
setTaskDesc
(
"Syncing Task"
);
//任务描述
task
.
setIsSubmit
(
"0"
);
//是否已提交
task
.
setTreeId
(
treeId
);
...
...
@@ -640,15 +641,59 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task
.
setTargetTableName
(
targetTable
);
task
.
setSourceTableName
(
sourceTableName
);
task
.
setSourceDbName
(
sourceDbName
);
task
.
setCreateUserId
(
SessionUtils
.
getCurrentUserId
());
task
.
setCreateTime
(
new
Date
());
dmpDevelopTaskDao
.
insert
(
task
);
//保存任务数据
logger
.
info
(
"################################## 新增任务数据结束 ############################################"
);
if
(
StringUtils
.
isEmpty
(
taskId
))
{
task
.
setVersion
(
"1.0"
);
task
.
setCreateUserId
(
SessionUtils
.
getCurrentUserId
());
task
.
setCreateTime
(
new
Date
());
dmpDevelopTaskDao
.
insert
(
task
);
//新增任务数据
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
logger
.
info
(
"################################## 新增离线任务数据结束 ############################################"
);
}
else
{
DmpDevelopTask
devTask
=
dmpDevelopTaskDao
.
selectTaskById
(
taskId
);
BigDecimal
version
=
new
BigDecimal
(
devTask
.
getVersion
());
version
=
version
.
add
(
new
BigDecimal
(
1.0
));
task
.
setVersion
(
String
.
valueOf
(
version
));
task
.
setUpdateTime
(
new
Date
());
task
.
setUpdateUserId
(
SessionUtils
.
getCurrentUserId
());
task
.
setId
(
Integer
.
valueOf
(
taskId
));
dmpDevelopTaskDao
.
update
(
task
);
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
logger
.
info
(
"################################## 编辑离线任务数据结束 ############################################"
);
}
//保存时提交XML
dmpDevelopTaskService
.
submitSyncing
(
task
);
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
task
);
}
/**
* 保存任务历史版本
*
* @param task
* @return
* @Author Bellamy
* @Date 2021/02/01
*/
public
JsonResult
saveTaskHistory
(
DmpDevelopTask
task
)
throws
Exception
{
if
(
task
.
getId
()
==
null
)
{
throw
new
RuntimeException
(
"保存失败!"
);
}
DmpDevelopTaskHistory
taskHistory
=
new
DmpDevelopTaskHistory
();
taskHistory
.
setTaskId
(
task
.
getId
());
taskHistory
.
setVersion
(
task
.
getVersion
());
taskHistory
.
setData
(
task
.
getData
());
taskHistory
.
setTaskDesc
(
task
.
getTaskDesc
());
taskHistory
.
setTaskId
(
task
.
getTreeId
());
taskHistory
.
setTaskType
(
task
.
getTaskType
());
taskHistory
.
setDatasourceId
(
task
.
getDatasourceId
());
taskHistory
.
setCreateTime
(
task
.
getCreateTime
());
taskHistory
.
setCreateUserId
(
Integer
.
valueOf
(
task
.
getCreateUserId
()));
taskHistory
.
setUpdateTime
(
task
.
getUpdateTime
());
taskHistory
.
setUpdateUserId
(
Integer
.
valueOf
(
task
.
getUpdateUserId
()));
taskHistory
.
setDataStatus
(
DelFlagEnum
.
NO
.
getValue
());
dmpDevelopTaskHistoryMapper
.
insert
(
taskHistory
);
return
JsonResult
.
ok
();
}
/**
* 保存规则信息
*
...
...
@@ -734,6 +779,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
for
(
int
i
=
0
;
i
<
returnList
.
size
();
i
++)
{
Map
map
=
returnList
.
get
(
i
);
map
.
put
(
"id"
,
++
len
);
map
.
put
(
"fieldAlias"
,
map
.
get
(
"name"
));
//字段别名
map
.
put
(
"isPk"
,
0
);
map
.
put
(
"isPt"
,
0
);
returnData
.
add
(
map
);
}
}
...
...
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
View file @
62614ff6
...
...
@@ -221,4 +221,12 @@
SELECT
<include
refid=
"FIND_ALL_COLUMN"
/>
FROM dmp_develop_task WHERE tree_id = #{id}
</select>
<select
id=
"selectTaskById"
resultType=
"com.jz.dmp.modules.model.DmpDevelopTask"
>
select
<include
refid=
"FIND_ALL_COLUMN"
/>
from dmp_develop_task
where 1=1 and data_status='1'
and id=#{taskId}
</select>
</mapper>
\ No newline at end of file
src/main/resources/templates/lxTaskJson.json
View file @
62614ff6
...
...
@@ -81,7 +81,7 @@
{
"params"
:
{
"version"
:
"1.0"
,
"version"
:
"1.0"
,
//版本
"treeId"
:
669
,
//
"parentId"
:
"509"
,
"mode"
:
"0"
,
...
...
@@ -97,8 +97,8 @@
"ftCount"
:
"分桶个数"
,
"separateMax"
:
"分桶字段最大值"
,
"separateMin"
:
"分桶字段最小值"
,
"primaryKey"
:
"主键"
,
"partition"
:
"分区"
,
//
"primaryKey"
:
"主键"
,
//
"partition"
:
"分区"
,
"postImportStatement"
:
"导入后语句"
,
"preImportStatement"
:
"导入前语句"
,
"errorLimitRecord"
:
"错误记录数超过"
,
...
...
@@ -106,7 +106,8 @@
//
"syncRate"
:
"同步速率"
,
"executorMemory"
:
"1"
,
//分配任务内存
"executorCores"
:
"1"
,
//单executor的cpu数
"totalExecutorCores"
:
"1"
//总executor的cpu数
"totalExecutorCores"
:
"1"
,
//总executor的cpu数
"fieldMapping"
:
""
//字段映射关系
},
"reader"
:
{
"dbConnection"
:
"mysql_dmp_demo_test"
,
//来源名称
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment