Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
fd8112f7
Commit
fd8112f7
authored
Dec 31, 2020
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
6db33892
Changes
8
Hide whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
56 additions
and
26 deletions
+56
-26
DmpDevelopTaskDao.java
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
+2
-0
DmpSyncingDatasourceDao.java
.../java/com/jz/dmp/modules/dao/DmpSyncingDatasourceDao.java
+7
-0
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+2
-4
DmpSyncingDatasourceServiceImpl.java
...modules/service/impl/DmpSyncingDatasourceServiceImpl.java
+15
-13
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+17
-8
lxTaskJson.json
src/main/resources/lxTaskJson.json
+1
-0
DmpDevelopTaskMapper.xml
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
+1
-1
DmpSyncingDatasourceMapper.xml
src/main/resources/mapper/dmp/DmpSyncingDatasourceMapper.xml
+11
-0
No files found.
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
View file @
fd8112f7
...
@@ -20,4 +20,6 @@ public interface DmpDevelopTaskDao {
...
@@ -20,4 +20,6 @@ public interface DmpDevelopTaskDao {
int
insert
(
DmpDevelopTask
dmpDevelopTask
)
throws
Exception
;
int
insert
(
DmpDevelopTask
dmpDevelopTask
)
throws
Exception
;
DmpDevelopTask
selectTaskInfoByParam
(
@Param
(
"treeId"
)
long
treeId
)
throws
Exception
;
DmpDevelopTask
selectTaskInfoByParam
(
@Param
(
"treeId"
)
long
treeId
)
throws
Exception
;
int
update
(
DmpDevelopTask
task
)
throws
Exception
;
}
}
src/main/java/com/jz/dmp/modules/dao/DmpSyncingDatasourceDao.java
View file @
fd8112f7
...
@@ -95,6 +95,11 @@ public interface DmpSyncingDatasourceDao {
...
@@ -95,6 +95,11 @@ public interface DmpSyncingDatasourceDao {
List
<
Map
>
queryDatasourceType
()
throws
Exception
;
List
<
Map
>
queryDatasourceType
()
throws
Exception
;
/**
* 新增-获取数据源类型
*
* @return
*/
List
<
Map
>
queryGroupDatasourceType
(
Map
map
)
throws
Exception
;
List
<
Map
>
queryGroupDatasourceType
(
Map
map
)
throws
Exception
;
int
countDatasourceByName
(
@Param
(
"datasourceName"
)
String
datasourceName
,
@Param
(
"projectId"
)
String
projectId
)
throws
Exception
;
int
countDatasourceByName
(
@Param
(
"datasourceName"
)
String
datasourceName
,
@Param
(
"projectId"
)
String
projectId
)
throws
Exception
;
...
@@ -104,4 +109,6 @@ public interface DmpSyncingDatasourceDao {
...
@@ -104,4 +109,6 @@ public interface DmpSyncingDatasourceDao {
DataSourceListDto
selectDataSourceInfoById
(
Map
map
)
throws
Exception
;
DataSourceListDto
selectDataSourceInfoById
(
Map
map
)
throws
Exception
;
List
<
DmpSyncingDatasource
>
findListByParams
(
DmpSyncingDatasource
ds
)
throws
Exception
;
List
<
DmpSyncingDatasource
>
findListByParams
(
DmpSyncingDatasource
ds
)
throws
Exception
;
List
<
Map
>
queryDbTypeByGroup
()
throws
Exception
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
fd8112f7
...
@@ -111,6 +111,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -111,6 +111,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
* 生成xml
* 生成xml
* */
* */
public
String
convert2SyncXmlContent
(
DmpDevelopTask
body
)
throws
Exception
{
public
String
convert2SyncXmlContent
(
DmpDevelopTask
body
)
throws
Exception
{
logger
.
info
(
"################################## 拼接xml 开始 ############################################"
);
StringBuilder
sb
=
new
StringBuilder
();
StringBuilder
sb
=
new
StringBuilder
();
long
treeId
=
body
.
getTreeId
();
long
treeId
=
body
.
getTreeId
();
//通过树节点id 查询任务信息
//通过树节点id 查询任务信息
...
@@ -173,10 +174,6 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -173,10 +174,6 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
List
<
Map
<
String
,
Object
>>
_writerColumns
=
(
List
<
Map
<
String
,
Object
>>)
writerMap
.
get
(
"column"
);
//目标数据库表字段
List
<
Map
<
String
,
Object
>>
_writerColumns
=
(
List
<
Map
<
String
,
Object
>>)
writerMap
.
get
(
"column"
);
//目标数据库表字段
Integer
_projectId_
=
body
.
getProjectId
();
//项目id
Integer
_projectId_
=
body
.
getProjectId
();
//项目id
/*Integer _parentId_ = (Integer) map.get("parentId"); //父节点id
String _mode_ = (String) map.get("mode");
String _version_ = (String) map.get("version");
String _name_ = (String) map.get("name"); //节点数名称*/
//根据项目id和数据源名称 查询数据信息
//根据项目id和数据源名称 查询数据信息
DmpSyncingDatasource
sDS
=
this
.
getDmpSyncingDatasource
(
_projectId_
,
_dbConnection
);
//源
DmpSyncingDatasource
sDS
=
this
.
getDmpSyncingDatasource
(
_projectId_
,
_dbConnection
);
//源
...
@@ -192,6 +189,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -192,6 +189,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
String
_sourceDbName_
=
this
.
getTargetDBname
(
sDS
);
String
_sourceDbName_
=
this
.
getTargetDBname
(
sDS
);
String
st
=
sDST
.
getDatasourceType
();
String
st
=
sDST
.
getDatasourceType
();
String
tt
=
tDST
.
getDatasourceType
();
String
tt
=
tDST
.
getDatasourceType
();
//获取文件类型
//获取文件类型
String
ft
=
this
.
getFileType
(
_fileType
,
_sourceHdfsFile
,
_sourceFtpFile
);
String
ft
=
this
.
getFileType
(
_fileType
,
_sourceHdfsFile
,
_sourceFtpFile
);
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpSyncingDatasourceServiceImpl.java
View file @
fd8112f7
...
@@ -159,24 +159,26 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
...
@@ -159,24 +159,26 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
*/
*/
@Override
@Override
public
JsonResult
queryGroupDatasourceType
()
throws
Exception
{
public
JsonResult
queryGroupDatasourceType
()
throws
Exception
{
List
<
Map
>
returnList
=
new
ArrayList
<>();
Map
map
=
new
HashMap
();
Map
map
=
new
HashMap
();
Map
<
String
,
ArrayList
<
Map
>>
returnMap
=
new
HashMap
();
List
<
Map
>
list
=
dmpSyncingDatasourceDao
.
queryGroupDatasourceType
(
map
);
List
<
Map
>
list
=
dmpSyncingDatasourceDao
.
queryGroupDatasourceType
(
map
);
List
<
Map
>
typeList
=
dmpSyncingDatasourceDao
.
queryDbTypeByGroup
();
if
(
list
.
size
()
>
0
&&
list
!=
null
)
{
if
(
list
.
size
()
>
0
&&
list
!=
null
)
{
for
(
Map
<
String
,
Object
>
dto
:
list
)
{
for
(
Map
typeDto
:
typeList
)
{
String
datasourceCatename
=
(
String
)
dto
.
get
(
"datasourceCatename"
);
Map
<
String
,
Object
>
returnMap
=
new
LinkedHashMap
();
if
(
returnMap
.
get
(
datasourceCatename
)
==
null
)
{
String
datasourceCatecode
=
typeDto
.
get
(
"datasourceCatecode"
).
toString
();
returnMap
.
put
(
datasourceCatename
,
new
ArrayList
<
Map
>());
returnMap
.
put
(
"dbValue"
,
new
ArrayList
<
Map
>());
for
(
Map
<
String
,
Object
>
dto
:
list
)
{
if
(
datasourceCatecode
.
equals
(
dto
.
get
(
"datasourceCatecode"
)))
{
returnMap
.
put
(
"typeName"
,
dto
.
get
(
"datasourceCatename"
));
List
<
Map
>
param
=
(
List
<
Map
>)
returnMap
.
get
(
"dbValue"
);
param
.
add
(
dto
);
}
}
}
ArrayList
<
Map
>
param
=
returnMap
.
get
(
datasourceCatename
);
returnList
.
add
(
returnMap
);
if
(
dto
.
get
(
"dbAttrs"
)
!=
null
)
{
JSONObject
dbAttrs
=
JSON
.
parseObject
(
dto
.
get
(
"dbAttrs"
).
toString
());
dto
.
put
(
"dbAttrs"
,
dbAttrs
);
}
param
.
add
(
dto
);
}
}
}
}
return
new
JsonResult
(
return
Map
);
return
new
JsonResult
(
return
List
);
}
}
/**
/**
...
@@ -291,7 +293,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
...
@@ -291,7 +293,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
dto
.
setDatasourceTypeName
(
str
.
get
(
"datasourceTypeName"
).
toString
());
dto
.
setDatasourceTypeName
(
str
.
get
(
"datasourceTypeName"
).
toString
());
}
}
if
(
StringUtils
.
isNotEmpty
(
str
.
get
(
"dbAttrs"
).
toString
()))
{
//数据源字段属性json
if
(
StringUtils
.
isNotEmpty
(
str
.
get
(
"dbAttrs"
).
toString
()))
{
//数据源字段属性json
Map
<
String
,
Object
>
dbAttrs
=
JSONObject
.
parseObject
(
str
.
get
(
"dbAttrs"
).
toString
());
Map
<
String
,
Object
>
dbAttrs
=
JSONObject
.
parseObject
(
str
.
get
(
"dbAttrs"
).
toString
());
dto
.
setJdbcUrl
(
dbAttrs
.
get
(
"jdbcUrl"
).
toString
());
dto
.
setJdbcUrl
(
dbAttrs
.
get
(
"jdbcUrl"
).
toString
());
dto
.
setPassword
(
dbAttrs
.
get
(
"password"
).
toString
());
dto
.
setPassword
(
dbAttrs
.
get
(
"password"
).
toString
());
dto
.
setDatasourceDesc
(
dbAttrs
.
get
(
"datasourceDesc"
).
toString
());
dto
.
setDatasourceDesc
(
dbAttrs
.
get
(
"datasourceDesc"
).
toString
());
...
...
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
fd8112f7
...
@@ -242,6 +242,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -242,6 +242,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
/**
/**
* 任务状态查看,异步调用azkaban服务
* 任务状态查看,异步调用azkaban服务
*
* @Author Bellamy
* @Author Bellamy
* @Date 2020/12/20
* @Date 2020/12/20
*/
*/
...
@@ -430,10 +431,11 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -430,10 +431,11 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
/**
/**
* 添加保存dmp数据(包含校验数据)
* 添加保存dmp数据(包含校验数据)
* @Author Bellamy
*
* @Date 2020/12/20
* @param syncDmpTaskAddReq
* @param syncDmpTaskAddReq
* @return
* @return
* @Author Bellamy
* @Date 2020/12/20
*/
*/
@Override
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
...
@@ -478,6 +480,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -478,6 +480,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
Integer
parentId
=
Integer
.
valueOf
(
body
.
get
(
"parentId"
).
toString
());
//父节点ID
Integer
parentId
=
Integer
.
valueOf
(
body
.
get
(
"parentId"
).
toString
());
//父节点ID
String
taskName
=
(
String
)
body
.
get
(
"taskName"
);
//任务名称 业务节点名称 一对一
String
taskName
=
(
String
)
body
.
get
(
"taskName"
);
//任务名称 业务节点名称 一对一
Integer
treeId
=
Integer
.
valueOf
(
body
.
get
(
"treeId"
).
toString
());
//树节点ID
Integer
treeId
=
Integer
.
valueOf
(
body
.
get
(
"treeId"
).
toString
());
//树节点ID
Integer
taskId
=
null
;
//任务ID
if
(
StringUtils
.
isBlank
(
taskName
))
{
if
(
StringUtils
.
isBlank
(
taskName
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
...
@@ -488,7 +491,6 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -488,7 +491,6 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
if
(
null
!=
content
&&
xmlTdb
.
length
()
==
0
)
{
// 包含content但未取出值条件才成立
if
(
null
!=
content
&&
xmlTdb
.
length
()
==
0
)
{
// 包含content但未取出值条件才成立
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"脚本内容中缺失目标数据源(target_db_connection)"
);
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"脚本内容中缺失目标数据源(target_db_connection)"
);
}
}
/*DmpNavigationTree tree = new DmpNavigationTree();
/*DmpNavigationTree tree = new DmpNavigationTree();
tree.setName(taskName); //树节点名称
tree.setName(taskName); //树节点名称
tree.setProjectId(projectId);
tree.setProjectId(projectId);
...
@@ -500,7 +502,6 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -500,7 +502,6 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
}*/
}*/
//保存目标库类型
//保存目标库类型
Integer
dataSourceId
=
null
;
//数据源ID
Map
<
String
,
Object
>
writer
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"writer"
);
// 目标数据
Map
<
String
,
Object
>
writer
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"writer"
);
// 目标数据
String
targetDb
=
(
String
)
writer
.
get
(
"targetDbConnection"
);
// 目标库名称
String
targetDb
=
(
String
)
writer
.
get
(
"targetDbConnection"
);
// 目标库名称
String
targetTable
=
(
String
)
writer
.
get
(
"targetTable"
);
// 目标库表名称
String
targetTable
=
(
String
)
writer
.
get
(
"targetTable"
);
// 目标库表名称
...
@@ -509,6 +510,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -509,6 +510,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
String
sourceDbName
=
(
String
)
reader
.
get
(
"dbConnection"
);
// 源库名称
String
sourceDbName
=
(
String
)
reader
.
get
(
"dbConnection"
);
// 源库名称
String
sourceTableName
=
(
String
)
reader
.
get
(
"registerTableName"
);
// 源库表名称
String
sourceTableName
=
(
String
)
reader
.
get
(
"registerTableName"
);
// 源库表名称
Integer
dataSourceId
=
null
;
//数据源ID
if
(
StringUtils
.
isNotBlank
(
targetDb
))
{
if
(
StringUtils
.
isNotBlank
(
targetDb
))
{
//根据 目标库和项目id 查询
//根据 目标库和项目id 查询
dataSourceId
=
dmpDevelopTaskDao
.
getDbInfoByParam
(
targetDb
,
projectId
);
dataSourceId
=
dmpDevelopTaskDao
.
getDbInfoByParam
(
targetDb
,
projectId
);
...
@@ -537,12 +539,17 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -537,12 +539,17 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task
.
setTargetTableName
(
targetTable
);
task
.
setTargetTableName
(
targetTable
);
task
.
setSourceTableName
(
sourceTableName
);
task
.
setSourceTableName
(
sourceTableName
);
task
.
setSourceDbName
(
sourceDbName
);
task
.
setSourceDbName
(
sourceDbName
);
dmpDevelopTaskDao
.
insert
(
task
);
//保存任务数据
if
(
StringUtils
.
isNotEmpty
(
body
.
get
(
"taskId"
).
toString
()))
{
taskId
=
Integer
.
valueOf
(
body
.
get
(
"taskId"
).
toString
());
//任务ID
task
.
setId
(
taskId
);
dmpDevelopTaskDao
.
update
(
task
);
logger
.
info
(
"################################## 编辑任务数据结束 ############################################"
);
}
else
{
dmpDevelopTaskDao
.
insert
(
task
);
//保存任务数据
logger
.
info
(
"################################## 新增任务数据结束 ############################################"
);
}
//保存时提交XML
//保存时提交XML
/*DmpTaskSchedule schedule = new DmpTaskSchedule();
schedule.setTreeId(task.getTreeId());
schedule.setProjectId(projectId);*/
dmpDevelopTaskService
.
submitSyncing
(
task
);
dmpDevelopTaskService
.
submitSyncing
(
task
);
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
task
);
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
task
);
...
@@ -591,6 +598,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -591,6 +598,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
taskRuleT
.
setRuleValue
(
ruleValue
);
taskRuleT
.
setRuleValue
(
ruleValue
);
list
.
add
(
taskRuleT
);
list
.
add
(
taskRuleT
);
}
}
//dvTaskRuleTService.
//批量新增任务与规则关系表
//批量新增任务与规则关系表
dvTaskRuleTService
.
saveRule
(
list
);
dvTaskRuleTService
.
saveRule
(
list
);
}
}
...
...
src/main/resources/lxTaskJson.json
View file @
fd8112f7
...
@@ -85,6 +85,7 @@
...
@@ -85,6 +85,7 @@
"parentId"
:
"509"
,
"parentId"
:
"509"
,
"mode"
:
"0"
,
"mode"
:
"0"
,
"projectId"
:
"31"
,
"projectId"
:
"31"
,
"taskId"
:
"任务id"
,
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
"scripts"
:
{
"scripts"
:
{
"setting"
:
{
"setting"
:
{
...
...
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
View file @
fd8112f7
...
@@ -60,7 +60,7 @@
...
@@ -60,7 +60,7 @@
TASK_DESC = #{taskDesc},
TASK_DESC = #{taskDesc},
</if>
</if>
<if
test=
"script != null"
>
<if
test=
"script != null"
>
SCRIPT = #{
script
},
SCRIPT = #{
data
},
</if>
</if>
<if
test=
"dataStatus != null and dataStatus != ''"
>
<if
test=
"dataStatus != null and dataStatus != ''"
>
DATA_STATUS = #{dataStatus},
DATA_STATUS = #{dataStatus},
...
...
src/main/resources/mapper/dmp/DmpSyncingDatasourceMapper.xml
View file @
fd8112f7
...
@@ -372,4 +372,15 @@
...
@@ -372,4 +372,15 @@
<if
test=
"projectId != null"
>
AND project_id = #{projectId}
</if>
<if
test=
"projectId != null"
>
AND project_id = #{projectId}
</if>
</select>
</select>
<select
id=
"queryDbTypeByGroup"
resultType=
"map"
>
SELECT
datasource_catename as datasourceCatename,
datasource_catecode as datasourceCatecode
from dmp_syncing_datasource_type
where data_status = '1' and is_enabled = '1'
<if
test=
"datasourceTypeId !=null and datasourceTypeId !=''"
>
and id=#{datasourceTypeId}
</if>
group by datasourceCatecode
ORDER BY datasource_catetype
</select>
</mapper>
</mapper>
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment