Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
5f24ed50
Commit
5f24ed50
authored
Jan 28, 2021
by
sml
Browse files
Options
Browse Files
Download
Plain Diff
azkaban登录修改及发布代码提交
parents
ada40899
dc687f4c
Changes
21
Hide whitespace changes
Inline
Side-by-side
Showing
21 changed files
with
593 additions
and
126 deletions
+593
-126
AzkabanApiUtils2.java
src/main/java/com/jz/common/utils/AzkabanApiUtils2.java
+53
-13
CommonUtils.java
src/main/java/com/jz/common/utils/CommonUtils.java
+65
-0
FlowParseTool.java
src/main/java/com/jz/common/utils/FlowParseTool.java
+73
-46
ReflectAssistUtils.java
src/main/java/com/jz/common/utils/ReflectAssistUtils.java
+68
-0
SFTPUtils.java
src/main/java/com/jz/common/utils/SFTPUtils.java
+3
-1
OfflineSynchController.java
...es/controller/DataIntegration/OfflineSynchController.java
+41
-14
DmpSyncingDatasourceTypeDao.java
...a/com/jz/dmp/modules/dao/DmpSyncingDatasourceTypeDao.java
+2
-0
DmpNavigationTree.java
...main/java/com/jz/dmp/modules/model/DmpNavigationTree.java
+1
-1
DmpPublicConfigInfo.java
...in/java/com/jz/dmp/modules/model/DmpPublicConfigInfo.java
+56
-0
DmpSyncingDatasourceType.java
...va/com/jz/dmp/modules/model/DmpSyncingDatasourceType.java
+6
-0
SSOUserInfo.java
src/main/java/com/jz/dmp/modules/model/SSOUserInfo.java
+30
-0
DmpSyncingDatasourceTypeService.java
.../dmp/modules/service/DmpSyncingDatasourceTypeService.java
+9
-0
OfflineSynchService.java
.../java/com/jz/dmp/modules/service/OfflineSynchService.java
+2
-1
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+13
-13
DmpSyncingDatasourceTypeServiceImpl.java
...les/service/impl/DmpSyncingDatasourceTypeServiceImpl.java
+17
-0
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+12
-5
DmpProjectConfigInfoServiceImpl.java
...vice/projconfig/impl/DmpProjectConfigInfoServiceImpl.java
+13
-0
DmpSyncingDatasourceTypeMapper.xml
...n/resources/mapper/dmp/DmpSyncingDatasourceTypeMapper.xml
+9
-0
OfflineSynchMapper.xml
src/main/resources/mapper/dmp/OfflineSynchMapper.xml
+1
-0
DmpPublicConfigInfoMapper.xml
...resources/mapper/projconfig/DmpPublicConfigInfoMapper.xml
+95
-11
lxTaskJson.json
src/main/resources/templates/lxTaskJson.json
+24
-21
No files found.
src/main/java/com/jz/common/utils/AzkabanApiUtils2.java
View file @
5f24ed50
package
com
.
jz
.
common
.
utils
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.google.gson.Gson
;
import
com.jz.common.utils.web.HttpClientUtils
;
import
com.jz.common.utils.web.SessionUtils
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro
;
import
java.io.File
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.core.io.FileSystemResource
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.http.HttpEntity
;
import
org.springframework.http.HttpHeaders
;
import
org.springframework.http.client.SimpleClientHttpRequestFactory
;
...
...
@@ -19,11 +20,16 @@ import org.springframework.util.MultiValueMap;
import
org.springframework.util.StringUtils
;
import
org.springframework.web.client.RestTemplate
;
import
java.io.File
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.google.gson.Gson
;
import
com.jz.common.utils.web.HttpClientUtils
;
import
com.jz.common.utils.web.SessionUtils
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro
;
import
com.jz.dmp.modules.model.DmpMember
;
import
com.jz.dmp.modules.model.DmpRole
;
import
com.jz.dmp.modules.model.SSOUserInfo
;
/**
* azkaban ajax api 工具类
...
...
@@ -36,6 +42,9 @@ public class AzkabanApiUtils2 {
private
String
azkabanServerUrl
;
private
String
userName
;
private
String
password
;
@Autowired
RedisTemplate
<
String
,
SSOUserInfo
>
redisTemplate
;
public
AzkabanApiUtils2
(
String
azkabanServerUrl
,
String
userName
,
String
password
)
{
this
(
azkabanServerUrl
);
...
...
@@ -93,11 +102,42 @@ public class AzkabanApiUtils2 {
throw new RuntimeException("登陆失败");
}*/
String
sessionId
=
SessionUtils
.
getSession
().
getId
();
//"dcfc608c-c58a-45b7-adc7-9902b652496e";
//String sessionId = "f70d53fa-55da-4688-8d00-64350e4fb8ea";
//String sessionId = "f0d06f4a-874c-4dfc-8959-101b6add6bf5";
//通过redis方式登录Azkaban
String
redisKey
=
"spring:sessions:sessions:"
+
sessionId
;
SSOUserInfo
ssoUserInfo
=
redisTemplate
.
opsForValue
().
get
(
redisKey
);
if
(
ssoUserInfo
==
null
)
{
redisTemplate
.
opsForValue
().
set
(
redisKey
,
getSSOuserInfo
());
}
System
.
err
.
println
(
"----sessionId="
+
sessionId
);
return
sessionId
;
//SessionUtils.getSession().getId();
}
/**
* @Title: getSSOuserInfo
* @Description: TODO(生成azkaban登录需要保存的实体)
* @param @return 参数
* @return SSOUserInfo 返回类型
* @throws
*/
private
SSOUserInfo
getSSOuserInfo
(){
Map
<
String
,
String
>
rolePermissMap
=
new
HashMap
<>();
DmpMember
dmpMember
=
SessionUtils
.
getSecurityUser
();
List
<
DmpRole
>
memberProjectRoles
=
dmpMember
.
getMemberProjectRoleList
();
for
(
DmpRole
role
:
memberProjectRoles
)
{
rolePermissMap
.
put
(
role
.
getRoleType
(),
role
.
getRemark
());
}
SSOUserInfo
ssoUserInfo
=
new
SSOUserInfo
();
ssoUserInfo
.
setUserName
(
dmpMember
.
getUsername
());
ssoUserInfo
.
setAzkabanRoleRefPermissions
(
rolePermissMap
);
return
ssoUserInfo
;
}
/**
* 创建azkaban项目名
...
...
src/main/java/com/jz/common/utils/CommonUtils.java
View file @
5f24ed50
package
com
.
jz
.
common
.
utils
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.UUID
;
import
java.util.stream.Collectors
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.util.CollectionUtils
;
public
class
CommonUtils
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
CommonUtils
.
class
);
/**
* UUID随机数
...
...
@@ -36,5 +44,62 @@ public class CommonUtils {
String
uuid
=
UUID
.
randomUUID
().
toString
().
toUpperCase
().
replaceAll
(
"-"
,
""
);
return
uuid
;
}
/**
* @param <T>
* @Title: objArrangeTree
* @Description: TODO(将所有的资源整理成树形结构)
* @param @param dmpPermissions
* @param @return
* @param @throws Exception 参数
* @return List<DmpPermission> 返回类型
* @throws
*/
public
static
<
T
>
List
<
T
>
objArrangeTree
(
Object
parentCode
,
List
<
T
>
objs
,
String
parentCodeFiledName
,
String
codeFieldName
,
String
childrenFieldName
)
throws
Exception
{
Map
<
Object
,
List
<
T
>>
dictMap
=
objs
.
stream
().
collect
(
Collectors
.
groupingBy
(
x
->{
try
{
return
ReflectAssistUtils
.
getFieldValueByFieldName
(
parentCodeFiledName
,
x
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
logger
.
error
(
"树形结构封装异常【{}】"
,
e
);
}
return
""
;
}));
List
<
T
>
tList
=
dictMap
.
get
(
parentCode
);
// 获取顶层资源
if
(!
CollectionUtils
.
isEmpty
(
tList
))
{
for
(
T
t
:
tList
)
{
t
=
arrangeChildren
(
t
,
dictMap
,
codeFieldName
,
childrenFieldName
);
}
}
return
tList
;
}
/**
* @Title: arrangeChildren
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param permission
* @param @param dictMap
* @param @return
* @param @throws Exception 参数
* @return DmpPermission 返回类型
* @throws
*/
private
static
<
T
>
T
arrangeChildren
(
T
t
,
Map
<
Object
,
List
<
T
>>
dictMap
,
String
codeFieldName
,
String
childrenFieldName
)
throws
Exception
{
Object
code
=
ReflectAssistUtils
.
getFieldValueByFieldName
(
codeFieldName
,
t
);
List
<
T
>
children
=
dictMap
.
get
(
code
);
if
(!
CollectionUtils
.
isEmpty
(
children
))
{
for
(
T
child
:
children
)
{
child
=
arrangeChildren
(
child
,
dictMap
,
codeFieldName
,
childrenFieldName
);
}
ReflectAssistUtils
.
setFieldValueByFieldName
(
childrenFieldName
,
t
,
children
);
}
return
t
;
}
}
src/main/java/com/jz/common/utils/FlowParseTool.java
View file @
5f24ed50
...
...
@@ -16,6 +16,7 @@ import com.jz.common.enums.NodeChangeTypeEnum;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNode
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowNodeChangeInfo
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowPro
;
import
com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto
;
import
com.jz.dmp.modules.model.DmpNavigationTree
;
import
com.jz.dmp.modules.model.DmpProject
;
import
com.jz.dmp.modules.model.DmpProjectSystemInfo
;
...
...
@@ -42,14 +43,42 @@ public class FlowParseTool {
/**
* 要发布到的项目配置信息
*/
private
DmpProjectSystemInfo
publishedToProjectSystemInfo
;
//private DmpProjectSystemInfo publishedToProjectSystemInfo;
/**
* 项目配置信息(调整)
*/
private
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
;
private
DmpDevelopTaskService
dmpDevelopTaskService
;
private
DmpNavigationTreeService
dmpNavigationTreeService
;
private
DmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService
;
/**
* 流程属性
*/
private
FlowPro
flowPro
;
/**
* 节点依赖关系
*/
private
Map
<
String
,
String
>
nodeDependcyRefMap
;
/**
* 流程节点
* key是节点id
*/
private
Map
<
String
,
FlowNode
>
flowNodeMap
;
/**
* 流程变更数据
*/
private
Map
flowChangedMap
;
/**
* 不发布项目用
...
...
@@ -78,34 +107,30 @@ public class FlowParseTool {
DmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService
)
{
this
(
flowPro
,
dmpWorkFlowSubmitDetailsService
);
this
.
publishedToProject
=
publishedToProject
;
this
.
publishedToProjectSystemInfo
=
publishedToProjectSystemInfo
;
//
this.publishedToProjectSystemInfo = publishedToProjectSystemInfo;
this
.
dmpDevelopTaskService
=
dmpDevelopTaskService
;
this
.
dmpNavigationTreeService
=
dmpNavigationTreeService
;
}
/**
* 流程属性
*/
private
FlowPro
flowPro
;
/**
* 节点依赖关系
*/
private
Map
<
String
,
String
>
nodeDependcyRefMap
;
/**
* 流程节点
* key是节点id
*/
private
Map
<
String
,
FlowNode
>
flowNodeMap
;
/**
* 流程变更数据
* 发布项目用
*
* @param flowPro
* @param publishedToProject
* @param dmpProjectConfigInfoDto
*/
private
Map
flowChangedMap
;
public
FlowParseTool
(
FlowPro
flowPro
,
DmpProject
publishedToProject
,
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
,
DmpDevelopTaskService
dmpDevelopTaskService
,
DmpNavigationTreeService
dmpNavigationTreeService
,
DmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService
)
{
this
(
flowPro
,
dmpWorkFlowSubmitDetailsService
);
this
.
publishedToProject
=
publishedToProject
;
this
.
dmpProjectConfigInfoDto
=
dmpProjectConfigInfoDto
;
this
.
dmpDevelopTaskService
=
dmpDevelopTaskService
;
this
.
dmpNavigationTreeService
=
dmpNavigationTreeService
;
}
private
void
parse
()
{
...
...
@@ -337,13 +362,13 @@ public class FlowParseTool {
*/
public
boolean
publish
()
throws
Exception
{
Long
publishedToProjectId
=
publishedToProjectSystemInfo
.
getProjectId
();
Long
publishedToProjectId
=
dmpProjectConfigInfoDto
.
getProjectId
().
longValue
();
Long
treeId
=
flowPro
.
getTreeId
();
/**
* 当前任务生成文件存放根路径
*/
String
localTaskPath
=
publishedToProjectSystemInfo
.
getAzkabanLocalTaskFilePath
()
+
"/"
+
publishedToProjectId
+
"/"
+
treeId
;
String
localTaskPath
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getAzkabanLocalTaskFilePath
()
+
"/"
+
publishedToProjectId
+
"/"
+
treeId
;
File
localTaskFile
=
new
File
(
localTaskPath
);
if
(!
localTaskFile
.
exists
())
{
localTaskFile
.
mkdirs
();
...
...
@@ -430,10 +455,12 @@ public class FlowParseTool {
//上传到azkaban todo
//上次zip包到azkaban
String
localTaskZipAbsolutePath
=
localTaskZipPath
+
"/"
+
localZipTargetFileName
;
String
azkabanApiUrl
=
publishedToProjectSystemInfo
.
getAzkabanMonitorUrl
();
String
azkabanApiUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getAzkabanMonitorUrl
();
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
);
return
azkabanApiUtils
.
loginCreateProjectuploadZipAndSchedule
(
"jz_workflow_"
+
publishedToProjectId
,
publishedToProject
.
getProjectDesc
(),
localTaskZipAbsolutePath
,
flowPro
);
return
azkabanApiUtils
.
loginCreateProjectuploadZipAndSchedule
(
"jz_workflow_
new_
"
+
publishedToProjectId
,
publishedToProject
.
getProjectDesc
(),
localTaskZipAbsolutePath
,
flowPro
);
}
/**
...
...
@@ -446,7 +473,7 @@ public class FlowParseTool {
String
fileName
=
flowNode
.
getNodeName
()
+
".sh"
;
String
scriptFileAbsolutePath
=
localTaskExecArgsPath
+
fileName
;
Long
publishedToProjectId
=
publishedToProjectSystemInfo
.
getProjectId
();
Long
publishedToProjectId
=
dmpProjectConfigInfoDto
.
getProjectId
().
longValue
();
Long
treeId
=
flowPro
.
getTreeId
();
List
<
String
>
list
=
new
ArrayList
<>();
...
...
@@ -456,16 +483,16 @@ public class FlowParseTool {
//远程shell 路径
String
remoteShellDir
=
publishedToProjectSystemInfo
.
getAzkabanExectorShellPath
()
+
"/"
+
publishedToProjectId
+
"/"
+
treeId
+
"/"
;
String
remoteShellDir
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getAzkabanExectorShellPath
()
+
"/"
+
publishedToProjectId
+
"/"
+
treeId
+
"/"
;
//上传shell文件 todo
SFTPUtils
sftpUtils
=
new
SFTPUtils
(
publishedToProjectSystemInfo
.
getShellCmdServer
(),
publishedToProjectSystemInfo
.
getShellCmdUser
(),
publishedToProjectSystemInfo
.
getShellCmdPassword
(),
publishedToProjectSystemInfo
.
getShellSftpPort
());
SFTPUtils
sftpUtils
=
new
SFTPUtils
(
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getShellCmdServer
(),
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getShellCmdUser
(),
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getShellCmdPassword
(),
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getShellSftpPort
());
sftpUtils
.
singleUploadFile
(
localTaskExecArgsPath
,
fileName
,
remoteShellDir
);
String
command
=
"command="
+
publishedToProjectSystemInfo
.
getAzkabanExectorShellExec
()
+
" "
+
publishedToProjectId
+
" ${azkaban.flow.flowid} ${azkaban.job.id} "
+
remoteShellDir
+
fileName
;
String
command
=
"command="
+
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getAzkabanExectorShellExec
()
+
" "
+
publishedToProjectId
+
" ${azkaban.flow.flowid} ${azkaban.job.id} "
+
remoteShellDir
+
fileName
;
return
command
;
}
...
...
@@ -480,22 +507,22 @@ public class FlowParseTool {
String
fileName
=
flowNode
.
getNodeName
()
+
".sql"
;
String
scriptFileAbsolutePath
=
localTaskExecArgsPath
+
fileName
;
Long
publishedToProjectId
=
publishedToProjectSystemInfo
.
getProjectId
();
Long
publishedToProjectId
=
dmpProjectConfigInfoDto
.
getProjectId
().
longValue
();
Long
treeId
=
flowPro
.
getTreeId
();
FileUtils
.
write
(
scriptFileAbsolutePath
,
flowNode
.
getScript
());
//上传sql文件 todo
//远程shell 路径
String
remoteSqlDir
=
publishedToProjectSystemInfo
.
getAzkabanExectorSqlPath
()
+
"/"
+
publishedToProjectId
+
"/"
+
treeId
+
"/"
;
String
remoteSqlDir
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getAzkabanExectorSqlPath
()
+
"/"
+
publishedToProjectId
+
"/"
+
treeId
+
"/"
;
//上传shell文件 todo
SFTPUtils
sftpUtils
=
new
SFTPUtils
(
publishedToProjectSystemInfo
.
getShellCmdServer
(),
publishedToProjectSystemInfo
.
getShellCmdUser
(),
publishedToProjectSystemInfo
.
getShellCmdPassword
(),
publishedToProjectSystemInfo
.
getShellSftpPort
());
SFTPUtils
sftpUtils
=
new
SFTPUtils
(
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getShellCmdServer
(),
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getShellCmdUser
(),
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getShellCmdPassword
(),
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getShellSftpPort
());
sftpUtils
.
singleUploadFile
(
localTaskExecArgsPath
,
fileName
,
remoteSqlDir
);
String
command
=
"command="
+
publishedToProjectSystemInfo
.
getAzkabanExectorSqlExec
()
+
" "
+
publishedToProjectId
+
" ${azkaban.flow.flowid} ${azkaban.job.id} "
+
remoteSqlDir
+
fileName
;
String
command
=
"command="
+
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getAzkabanExectorSqlExec
()
+
" "
+
publishedToProjectId
+
" ${azkaban.flow.flowid} ${azkaban.job.id} "
+
remoteSqlDir
+
fileName
;
return
command
;
}
...
...
@@ -516,7 +543,7 @@ public class FlowParseTool {
String execXmlFileName = execXmlFileNameAndVersion.split("@")[1];*/
//任务所在项目id
Long
projectId
=
flowPro
.
getProjectId
();
Long
publishedToProjectId
=
publishedToProjectSystemInfo
.
getProjectId
();
Long
publishedToProjectId
=
dmpProjectConfigInfoDto
.
getProjectId
().
longValue
();
//根据taskName获取treeId
String
taskName
=
flowNode
.
getScript
();
...
...
@@ -533,7 +560,7 @@ public class FlowParseTool {
String
execXmlFileNameAndVersion
=
getPublishSyncTaskFileNameAndLatestVersion
(
taskName
,
syncTaskTreeId
);
String
execXmlFileName
=
execXmlFileNameAndVersion
.
split
(
"@"
)[
1
];
//xml 执行xml的命令写到job文件中
String
command
=
"command="
+
publishedToProjectSystemInfo
.
getAzkabanExectorXmlExec
()
+
" "
+
publishedToProjectId
+
" ${azkaban.flow.flowid} ${azkaban.job.id} "
+
execXmlFileName
;
String
command
=
"command="
+
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getAzkabanExectorXmlExec
()
+
" "
+
publishedToProjectId
+
" ${azkaban.flow.flowid} ${azkaban.job.id} "
+
execXmlFileName
;
return
command
;
}
...
...
@@ -563,7 +590,7 @@ public class FlowParseTool {
String
subProcessFlowName
=
flowNode
.
getScript
();
//检查子流程是否存在 todo
String
azkabanApiUrl
=
publishedToProjectSystemInfo
.
getAzkabanMonitorUrl
();
String
azkabanApiUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
()
.
getAzkabanMonitorUrl
();
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
);
boolean
flowExists
=
azkabanApiUtils
.
checkFlowExists
(
"jz_workflow_"
+
flowPro
.
getPublishedToProjectId
(),
subProcessFlowName
);
if
(!
flowExists
)
{
...
...
src/main/java/com/jz/common/utils/ReflectAssistUtils.java
0 → 100644
View file @
5f24ed50
package
com
.
jz
.
common
.
utils
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
import
java.util.List
;
import
com.jz.dmp.modules.controller.bean.DmpNavigationTreeDto
;
/**
* @ClassName: ReflectAssistUtils
* @Description: TODO(反射辅助工具类)
* @author ybz
* @date 2021年1月26日
*
*/
public
class
ReflectAssistUtils
{
/**
* @Title: getFieldValueByFieldName
* @Description: TODO(根据属性名称获取属性值)
* @param @param fieldName
* @param @param cls
* @param @return
* @param @throws Exception 参数
* @return Field 返回类型
* @throws
*/
public
static
Object
getFieldValueByFieldName
(
String
fieldName
,
Object
obj
)
throws
Exception
{
Class
<?>
cls
=
obj
.
getClass
();
String
getMethodName
=
"get"
+
fieldName
.
substring
(
0
,
1
).
toUpperCase
()+
fieldName
.
substring
(
1
);
Method
getMethod
=
cls
.
getMethod
(
getMethodName
);
return
getMethod
.
invoke
(
obj
);
}
/**
* @param <T>
* @Title: setFieldValueByFieldName
* @Description: TODO(设置属性值)
* @param @param fieldName
* @param @param cls
* @param @param fieldVaule
* @param @throws Exception 参数
* @return void 返回类型
* @throws
*/
public
static
<
T
>
void
setFieldValueByFieldName
(
String
fieldName
,
Object
obj
,
T
fieldVaule
)
throws
Exception
{
Class
<?>
cls
=
obj
.
getClass
();
String
setMethodName
=
"set"
+
fieldName
.
substring
(
0
,
1
).
toUpperCase
()+
fieldName
.
substring
(
1
);
Class
<?>
fieldValueClass
=
fieldVaule
.
getClass
();
if
(
fieldVaule
instanceof
ArrayList
)
{
fieldValueClass
=
List
.
class
;
}
Method
setMethod
=
cls
.
getMethod
(
setMethodName
,
fieldValueClass
);
setMethod
.
invoke
(
obj
,
fieldVaule
);
}
public
static
void
main
(
String
[]
args
)
{
try
{
DmpNavigationTreeDto
dmpNavigationTreeDto
=
new
DmpNavigationTreeDto
();
List
<
DmpNavigationTreeDto
>
list
=
new
ArrayList
<
DmpNavigationTreeDto
>();
setFieldValueByFieldName
(
"children"
,
dmpNavigationTreeDto
,
list
);
System
.
out
.
println
(
getFieldValueByFieldName
(
"children"
,
dmpNavigationTreeDto
));
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
}
src/main/java/com/jz/common/utils/SFTPUtils.java
View file @
5f24ed50
...
...
@@ -122,6 +122,7 @@ public class SFTPUtils {
* @param remoteFileDirPath 要上传到的远程文件路径
*/
public
void
singleUploadFile
(
String
localFileDirPath
,
String
uploadFileName
,
String
remoteFileDirPath
)
{
String
pathTeString
=
"C:\\opt\\dmp\\dmp_web\\35\\705\\execArgs\\"
;
//本地文件绝对路径
String
localFileAbsolutePath
=
localFileDirPath
+
uploadFileName
;
String
remoteFileAbsolutePath
=
remoteFileDirPath
+
"/"
+
uploadFileName
;
...
...
@@ -129,7 +130,8 @@ public class SFTPUtils {
createRemoteDirs
(
remoteFileDirPath
);
try
{
sftp
.
put
(
localFileAbsolutePath
,
remoteFileAbsolutePath
,
ChannelSftp
.
OVERWRITE
);
//sftp.put(localFileAbsolutePath, remoteFileAbsolutePath,ChannelSftp.OVERWRITE);
sftp
.
put
(
pathTeString
+
uploadFileName
,
remoteFileAbsolutePath
,
ChannelSftp
.
OVERWRITE
);
sftp
.
chmod
(
Integer
.
parseInt
(
"775"
,
8
),
remoteFileAbsolutePath
);
LOGGER
.
info
(
"上传"
+
localFileAbsolutePath
+
" 到 "
+
remoteFileAbsolutePath
+
" 成功"
);
}
catch
(
SftpException
e
)
{
...
...
src/main/java/com/jz/dmp/modules/controller/DataIntegration/OfflineSynchController.java
View file @
5f24ed50
...
...
@@ -5,10 +5,12 @@ import com.jz.common.constant.ResultCode;
import
com.jz.common.page.BasePageBean
;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.*
;
import
com.jz.dmp.modules.service.DmpNavigationTreeService
;
import
com.jz.dmp.modules.controller.dataService.bean.SoureTableColumnsReq
;
import
com.jz.dmp.modules.service.DmpSyncingDatasourceTypeService
;
import
com.jz.dmp.modules.service.OfflineSynchService
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiImplicitParam
;
import
io.swagger.annotations.ApiImplicitParams
;
import
io.swagger.annotations.ApiOperation
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
...
@@ -16,9 +18,7 @@ import org.springframework.validation.annotation.Validated;
import
org.springframework.web.bind.annotation.*
;
import
javax.servlet.http.HttpServletRequest
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* @Description:离线同步
...
...
@@ -34,6 +34,9 @@ public class OfflineSynchController {
@Autowired
private
OfflineSynchService
offlineSynchService
;
@Autowired
private
DmpSyncingDatasourceTypeService
dmpSyncingDatasourceTypeService
;
/**
* 离线同步任务列表分页查询
*
...
...
@@ -62,12 +65,29 @@ public class OfflineSynchController {
*/
@ApiOperation
(
value
=
"获取源数据库名称-下拉框"
,
notes
=
"获取源数据库名称"
)
@GetMapping
(
value
=
"/sourceDbList"
)
@ApiImplicitParam
(
name
=
"projectId"
,
value
=
"项目id"
,
required
=
true
)
public
JsonResult
<
List
<
SourceDbNameListDto
>>
getSourceDbList
(
@RequestParam
Integer
projectId
)
throws
Exception
{
JsonResult
<
List
<
SourceDbNameListDto
>>
jsonResult
=
offlineSynchService
.
querygSourceDbList
(
projectId
);
@ApiImplicitParams
({
@ApiImplicitParam
(
name
=
"projectId"
,
value
=
"项目id"
,
required
=
true
)
,
@ApiImplicitParam
(
name
=
"datasourceTypeId"
,
value
=
"数据源类型id"
)
,
@ApiImplicitParam
(
name
=
"type"
,
value
=
"01:来源,02目标"
)
})
public
JsonResult
<
List
<
SourceDbNameListDto
>>
getSourceDbList
(
@RequestParam
Integer
projectId
,
@RequestParam
(
required
=
false
)
String
type
,
@RequestParam
String
datasourceTypeId
)
throws
Exception
{
JsonResult
<
List
<
SourceDbNameListDto
>>
jsonResult
=
offlineSynchService
.
querygSourceDbList
(
projectId
,
datasourceTypeId
);
return
jsonResult
;
}
/**
* 获取数据源——下拉框
*
* @return
* @since 2021-01-21
* @author Bellamy
*/
@ApiOperation
(
value
=
"获取数据源—下拉框"
,
notes
=
"获取数据源—下拉框"
)
@GetMapping
(
value
=
"/datasourceList"
)
public
JsonResult
getDatasourceList
()
throws
Exception
{
JsonResult
list
=
dmpSyncingDatasourceTypeService
.
queryDatasourceList
();
return
list
;
}
/**
* 根据源数据库id,获取源数据表——下拉框
*
...
...
@@ -76,7 +96,7 @@ public class OfflineSynchController {
*/
@ApiOperation
(
value
=
"根据源数据库id,获取源数据表-下拉框"
,
notes
=
"根据源数据库id,获取源数据表"
)
@GetMapping
(
value
=
"/sourceTableList"
)
@ApiImplicitParam
(
name
=
"sourceDbId"
,
value
=
"源数据库id"
,
required
=
true
)
@ApiImplicitParam
s
({
@ApiImplicitParam
(
name
=
"sourceDbId"
,
value
=
"源数据库id"
,
required
=
true
)}
)
public
JsonResult
getSourceTableList
(
@RequestParam
Long
sourceDbId
,
@RequestParam
(
value
=
"targetName"
,
required
=
false
)
String
targetName
)
throws
Exception
{
JsonResult
list
=
offlineSynchService
.
querygSourceTableList
(
sourceDbId
,
targetName
);
return
list
;
...
...
@@ -175,16 +195,23 @@ public class OfflineSynchController {
}
/**
* 获取
源表和目标表的
字段
* 获取
数据源表
字段
*
* @return
* @author Bellamy
*/
@ApiOperation
(
value
=
"获取
源表和目标表的字段"
,
notes
=
"获取源表和目标表的
字段"
)
@ApiOperation
(
value
=
"获取
数据源表字段"
,
notes
=
"获取数据源表
字段"
)
@PostMapping
(
value
=
"/getSoureAndTargetColumns"
)
public
JsonResult
getSoureAndTargetColumns
(
@RequestBody
@Validated
SoureAndTargetColumnsReq
soureAndTargetColumnsReq
)
throws
Exception
{
JsonResult
list
=
offlineSynchService
.
querySoureAndTargetColumnsByParams
(
soureAndTargetColumnsReq
);
return
list
;
public
JsonResult
getSoureAndTargetColumns
(
@RequestBody
@Validated
SoureTableColumnsReq
soureAndTargetColumnsReq
)
throws
Exception
{
JsonResult
jsonResult
=
new
JsonResult
();
try
{
jsonResult
=
offlineSynchService
.
querySoureTableColumns
(
soureAndTargetColumnsReq
);
}
catch
(
Exception
e
)
{
jsonResult
.
setMessage
(
e
.
getMessage
());
jsonResult
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
}
return
jsonResult
;
}
/**
...
...
@@ -249,9 +276,9 @@ public class OfflineSynchController {
if
(
StringUtils
.
isEmpty
(
newSynchTaskReq
.
getTreeName
()))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
}
if
(
StringUtils
.
isEmpty
(
newSynchTaskReq
.
getType
()))
{
/*
if (StringUtils.isEmpty(newSynchTaskReq.getType())) {
return JsonResult.error(ResultCode.PARAMS_ERROR, "任务类型不能为空");
}
}
*/
if
(
StringUtils
.
isEmpty
(
newSynchTaskReq
.
getProjectId
()))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"目标文件夹不能为空"
);
}
...
...
src/main/java/com/jz/dmp/modules/dao/DmpSyncingDatasourceTypeDao.java
View file @
5f24ed50
...
...
@@ -4,6 +4,7 @@ import com.jz.dmp.modules.model.DmpSyncingDatasourceType;
import
org.apache.ibatis.annotations.Param
;
import
java.util.List
;
import
java.util.Map
;
/**
* 数据源类型(DmpSyncingDatasourceType)表数据库访问层
...
...
@@ -79,4 +80,5 @@ public interface DmpSyncingDatasourceTypeDao {
*/
int
deleteById
(
Integer
id
);
List
<
DmpSyncingDatasourceType
>
queryAllByParams
(
Map
params
)
throws
Exception
;
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/model/DmpNavigationTree.java
View file @
5f24ed50
...
...
@@ -88,7 +88,7 @@ public class DmpNavigationTree implements Serializable {
* 父节点ID
*/
@ApiModelProperty
(
value
=
"父节点ID"
)
p
rivate
Integer
parentId
;
p
ublic
Integer
parentId
;
public
Integer
getId
()
{
...
...
src/main/java/com/jz/dmp/modules/model/DmpPublicConfigInfo.java
View file @
5f24ed50
...
...
@@ -163,6 +163,30 @@ public class DmpPublicConfigInfo implements Serializable{
*/
@ApiModelProperty
(
value
=
"元数据服务web地址"
)
private
String
atlasMonitorUrl
;
/**
* 远程连接默认SERVER地址
*/
@ApiModelProperty
(
value
=
"远程连接默认SERVER地址"
)
private
String
shellCmdServer
;
/**
* 远程连接默认用户
*/
@ApiModelProperty
(
value
=
"远程连接默认用户"
)
private
String
shellCmdUser
;
/**
* 远程连接默认用户密码
*/
@ApiModelProperty
(
value
=
"远程连接默认用户密码"
)
private
String
shellCmdPassword
;
/**
* 上传配置的SFTP端口
*/
@ApiModelProperty
(
value
=
"上传配置的SFTP端口"
)
private
Integer
shellSftpPort
;
/**
* 备注
...
...
@@ -396,6 +420,38 @@ public class DmpPublicConfigInfo implements Serializable{
public
void
setAtlasMonitorUrl
(
String
atlasMonitorUrl
)
{
this
.
atlasMonitorUrl
=
atlasMonitorUrl
;
}
public
String
getShellCmdServer
()
{
return
shellCmdServer
;
}
public
void
setShellCmdServer
(
String
shellCmdServer
)
{
this
.
shellCmdServer
=
shellCmdServer
;
}
public
String
getShellCmdUser
()
{
return
shellCmdUser
;
}
public
void
setShellCmdUser
(
String
shellCmdUser
)
{
this
.
shellCmdUser
=
shellCmdUser
;
}
public
String
getShellCmdPassword
()
{
return
shellCmdPassword
;
}
public
void
setShellCmdPassword
(
String
shellCmdPassword
)
{
this
.
shellCmdPassword
=
shellCmdPassword
;
}
public
Integer
getShellSftpPort
()
{
return
shellSftpPort
;
}
public
void
setShellSftpPort
(
Integer
shellSftpPort
)
{
this
.
shellSftpPort
=
shellSftpPort
;
}
public
String
getRemark
()
{
return
remark
;
...
...
src/main/java/com/jz/dmp/modules/model/DmpSyncingDatasourceType.java
View file @
5f24ed50
package
com
.
jz
.
dmp
.
modules
.
model
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
java.io.Serializable
;
/**
...
...
@@ -8,15 +11,18 @@ import java.io.Serializable;
* @author Bellamy
* @since 2020-12-21 18:39:06
*/
@ApiModel
(
"数据源类型"
)
public
class
DmpSyncingDatasourceType
implements
Serializable
{
private
static
final
long
serialVersionUID
=
526021146272437267L
;
/**
* ID
*/
@ApiModelProperty
(
value
=
"id"
)
private
Integer
id
;
/**
* 数据源名称
*/
@ApiModelProperty
(
value
=
"数据源名称"
)
private
String
datasource
;
/**
* 数据源分类
...
...
src/main/java/com/jz/dmp/modules/model/SSOUserInfo.java
0 → 100644
View file @
5f24ed50
package
com
.
jz
.
dmp
.
modules
.
model
;
import
java.io.Serializable
;
import
java.util.Map
;
public
class
SSOUserInfo
implements
Serializable
{
//用户名
private
String
userName
;
//用户角色对应的用户信息
private
Map
<
String
,
String
>
azkabanRoleRefPermissions
;
public
String
getUserName
()
{
return
userName
;
}
public
void
setUserName
(
String
userName
)
{
this
.
userName
=
userName
;
}
public
Map
<
String
,
String
>
getAzkabanRoleRefPermissions
()
{
return
azkabanRoleRefPermissions
;
}
public
void
setAzkabanRoleRefPermissions
(
Map
<
String
,
String
>
azkabanRoleRefPermissions
)
{
this
.
azkabanRoleRefPermissions
=
azkabanRoleRefPermissions
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/DmpSyncingDatasourceTypeService.java
View file @
5f24ed50
package
com
.
jz
.
dmp
.
modules
.
service
;
import
com.jz.common.constant.JsonResult
;
import
com.jz.dmp.modules.model.DmpSyncingDatasourceType
;
import
java.util.List
;
...
...
@@ -54,4 +55,12 @@ public interface DmpSyncingDatasourceTypeService {
*/
boolean
deleteById
(
Integer
id
);
/**
* 获取数据源——下拉框
*
* @return
* @since 2021-01-21
* @author Bellamy
*/
JsonResult
queryDatasourceList
()
throws
Exception
;
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/OfflineSynchService.java
View file @
5f24ed50
...
...
@@ -32,7 +32,7 @@ public interface OfflineSynchService {
* @return
* @author Bellamy
*/
JsonResult
querygSourceDbList
(
Integer
projectId
)
throws
Exception
;
JsonResult
querygSourceDbList
(
Integer
projectId
,
String
databaseTypeName
)
throws
Exception
;
/**
* 根据源数据库id,获取源数据表——下拉框
...
...
@@ -120,4 +120,5 @@ public interface OfflineSynchService {
* @since 2021-01-26
*/
JsonResult
addNewSynchTask
(
NewSynchTaskReq
newSynchTaskReq
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
5f24ed50
...
...
@@ -197,12 +197,12 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
// 脚本模式-json内容,解析JSON串并组装XML内容
Map
<
String
,
Object
>
settingMap
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"setting"
);
String
_extract
=
(
String
)
settingMap
.
get
(
"extract"
);
String
_extractExpression
=
(
String
)
settingMap
.
get
(
"extractExpression"
);
String
_extractExpression
=
(
String
)
settingMap
.
get
(
"extractExpression"
);
//增量表达式
String
_targetBucketCounts
=
(
String
)
settingMap
.
get
(
"targetBucketCounts"
);
String
_errorLimitRecord
=
(
String
)
settingMap
.
get
(
"errorLimitRecord"
);
//错误记录数超过
String
_executorMemory
=
(
String
)
settingMap
.
get
(
"executorMemory"
);
String
_executorCores
=
(
String
)
settingMap
.
get
(
"executorCores"
);
String
_totalExecutorCores
=
(
String
)
settingMap
.
get
(
"totalExecutorCores"
);
String
_executorMemory
=
(
String
)
settingMap
.
get
(
"executorMemory"
);
//分配任务内存
String
_executorCores
=
(
String
)
settingMap
.
get
(
"executorCores"
);
//单executor的cpu数
String
_totalExecutorCores
=
(
String
)
settingMap
.
get
(
"totalExecutorCores"
);
//总executor的cpu数
String
_ftColumn
=
(
String
)
settingMap
.
get
(
"ftColumn"
);
//分桶字段
String
_ftCount
=
(
String
)
settingMap
.
get
(
"ftCount"
);
//分桶个数
String
_separateMax
=
(
String
)
settingMap
.
get
(
"separateMax"
);
//分桶字段最大值
...
...
@@ -216,19 +216,19 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
Map
<
String
,
Object
>
readerMap
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"reader"
);
String
_dbConnection
=
(
String
)
readerMap
.
get
(
"dbConnection"
);
//源数据库名称
String
_fileType
=
(
String
)
readerMap
.
get
(
"fileType"
);
//文件类型
String
_sourceHdfsPath
=
(
String
)
readerMap
.
get
(
"sourceHdfsPath"
);
String
_sourceHdfsPath
=
(
String
)
readerMap
.
get
(
"sourceHdfsPath"
);
//HDFS存储目录
String
_sourceHdfsFile
=
(
String
)
readerMap
.
get
(
"sourceHdfsFile"
);
String
_sourceFtpDir
=
(
String
)
readerMap
.
get
(
"sourceFtpDir"
);
String
_sourceFtpFile
=
(
String
)
readerMap
.
get
(
"sourceFtpFile"
);
String
_sourceSkipFtpFile
=
(
String
)
readerMap
.
get
(
"sourceSkipFtpFile"
);
String
_sourceCsvDelimiter
=
(
String
)
readerMap
.
get
(
"sourceCsvDelimiter"
);
String
_sourceCsvHeader
=
(
String
)
readerMap
.
get
(
"sourceCsvHeader"
);
String
_sourceCsvCharset
=
(
String
)
readerMap
.
get
(
"sourceCsvCharset"
);
String
_sourceFtpDir
=
(
String
)
readerMap
.
get
(
"sourceFtpDir"
);
//文件所在目录
String
_sourceFtpFile
=
(
String
)
readerMap
.
get
(
"sourceFtpFile"
);
//文件名
String
_sourceSkipFtpFile
=
(
String
)
readerMap
.
get
(
"sourceSkipFtpFile"
);
//没有数据文件是否跳过
String
_sourceCsvDelimiter
=
(
String
)
readerMap
.
get
(
"sourceCsvDelimiter"
);
//分隔符
String
_sourceCsvHeader
=
(
String
)
readerMap
.
get
(
"sourceCsvHeader"
);
//是否含有表头
String
_sourceCsvCharset
=
(
String
)
readerMap
.
get
(
"sourceCsvCharset"
);
//字符集编码
String
_sourceCsvQuote
=
(
String
)
readerMap
.
get
(
"sourceCsvQuote"
);
String
_sourceFtpLoadDate
=
(
String
)
readerMap
.
get
(
"sourceFtpLoadDate"
);
String
_sourceFtpLoadDate
=
(
String
)
readerMap
.
get
(
"sourceFtpLoadDate"
);
//加载数据日期
String
_registerTableName
=
(
String
)
readerMap
.
get
(
"registerTableName"
);
//源数据库表名称
String
registerTableName_
=
(
String
)
readerMap
.
get
(
"registerTableName"
);
String
_dayByDay
=
(
String
)
readerMap
.
get
(
"dayByDay"
);
String
_dayByDay
=
(
String
)
readerMap
.
get
(
"dayByDay"
);
//dayByDay
List
<
Map
<
String
,
Object
>>
_readerColumns
=
(
List
<
Map
<
String
,
Object
>>)
readerMap
.
get
(
"column"
);
//源数据库表字段
//******目标数据******
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpSyncingDatasourceTypeServiceImpl.java
View file @
5f24ed50
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
com.jz.common.constant.JsonResult
;
import
com.jz.dmp.modules.dao.DmpSyncingDatasourceTypeDao
;
import
com.jz.dmp.modules.model.DmpSyncingDatasourceType
;
import
com.jz.dmp.modules.service.DmpSyncingDatasourceTypeService
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
/**
* 数据源类型(DmpSyncingDatasourceType)表服务实现类
...
...
@@ -76,4 +79,18 @@ public class DmpSyncingDatasourceTypeServiceImpl implements DmpSyncingDatasource
public
boolean
deleteById
(
Integer
id
)
{
return
this
.
dmpSyncingDatasourceTypeDao
.
deleteById
(
id
)
>
0
;
}
/**
* 获取数据源——下拉框
*
* @return
* @author Bellamy
* @since 2021-01-21
*/
@Override
public
JsonResult
queryDatasourceList
()
throws
Exception
{
Map
params
=
new
HashMap
<>();
List
<
DmpSyncingDatasourceType
>
list
=
dmpSyncingDatasourceTypeDao
.
queryAllByParams
(
params
);
return
JsonResult
.
ok
(
list
);
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
5f24ed50
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.github.pagehelper.PageHelper
;
import
com.github.pagehelper.PageInfo
;
import
com.jz.agent.service.DmpDsAgentService
;
...
...
@@ -32,7 +31,6 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.security.access.method.P
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Propagation
;
import
org.springframework.transaction.annotation.Transactional
;
...
...
@@ -116,10 +114,10 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
* 获取源数据库名称——下拉框
*/
@Override
public
JsonResult
<
List
<
SourceDbNameListDto
>>
querygSourceDbList
(
Integer
projectId
)
throws
Exception
{
public
JsonResult
<
List
<
SourceDbNameListDto
>>
querygSourceDbList
(
Integer
projectId
,
String
datasourceTypeId
)
throws
Exception
{
Map
map
=
new
HashMap
();
map
.
put
(
"projectId"
,
projectId
);
//项目id
map
.
put
(
"
isEnableSource"
,
"1"
);
map
.
put
(
"
datasourceTypeId"
,
datasourceTypeId
);
List
<
SourceDbNameListDto
>
list
=
offlineSynchDao
.
querygSourceDbList
(
map
);
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
list
);
}
...
...
@@ -621,7 +619,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
DmpDevelopTask
task
=
new
DmpDevelopTask
();
task
.
setProjectId
(
projectId
);
task
.
setParentId
(
parentId
);
//
task.setParentId(parentId);
task
.
setTaskType
(
"2"
);
//任务类型
task
.
setDatasourceId
(
dataSourceId
);
//数据源ID
task
.
setType
(
"3"
);
...
...
@@ -730,6 +728,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
Map
map
=
returnList
.
get
(
i
);
map
.
put
(
"id"
,
i
+
1
);
}
}
else
{
throw
new
RuntimeException
(
"无数据!"
);
}
return
JsonResult
.
ok
(
returnList
);
}
...
...
@@ -766,6 +766,13 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
public
JsonResult
addNewSynchTask
(
NewSynchTaskReq
newSynchTaskReq
)
throws
Exception
{
DmpNavigationTree
dmpNavigationTree
=
new
DmpNavigationTree
();
dmpNavigationTree
.
setName
(
newSynchTaskReq
.
getTreeName
());
dmpNavigationTree
.
setDataStatus
(
"1"
);
List
<
DmpNavigationTree
>
list
=
dmpNavigationTreeDao
.
queryAll
(
dmpNavigationTree
);
if
(
list
.
size
()
>
0
&&
list
!=
null
)
{
return
JsonResult
.
error
(
"任务名称已存在!"
);
}
DmpNavigationTree
tree
=
new
DmpNavigationTree
();
tree
.
setCreateTime
(
new
Date
());
tree
.
setCreateUserId
(
SessionUtils
.
getCurrentUserId
());
...
...
src/main/java/com/jz/dmp/modules/service/projconfig/impl/DmpProjectConfigInfoServiceImpl.java
View file @
5f24ed50
...
...
@@ -29,6 +29,8 @@ import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoBatch;
import
com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto
;
import
com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoRequest
;
import
com.jz.dmp.modules.controller.projconfig.bean.DmpProjectEngineParamDto
;
import
com.jz.dmp.modules.controller.projconfig.bean.DmpPublicConfigInfoDto
;
import
com.jz.dmp.modules.controller.projconfig.bean.DmpPublicConfigInfoRequest
;
import
com.jz.dmp.modules.dao.projconfig.DmpProjectConfigEngineMapper
;
import
com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper
;
import
com.jz.dmp.modules.dao.projconfig.DmpProjectEngineParamMapper
;
...
...
@@ -36,6 +38,7 @@ import com.jz.dmp.modules.model.DmpProjectConfigEngine;
import
com.jz.dmp.modules.model.DmpProjectConfigInfo
;
import
com.jz.dmp.modules.model.DmpProjectEngineParam
;
import
com.jz.dmp.modules.service.projconfig.DmpProjectConfigInfoService
;
import
com.jz.dmp.modules.service.projconfig.DmpPublicConfigInfoService
;
/**
* 项目配置表服务的实现?
...
...
@@ -54,6 +57,9 @@ public class DmpProjectConfigInfoServiceImpl extends BaseService implements DmpP
private
DmpProjectConfigEngineMapper
dmpProjectConfigEngineMapper
;
@Autowired
private
DmpProjectEngineParamMapper
dmpProjectEngineParamMapper
;
@Autowired
private
DmpPublicConfigInfoService
dmpPublicConfigInfoService
;
/*
* (non-Javadoc)
...
...
@@ -668,6 +674,13 @@ public class DmpProjectConfigInfoServiceImpl extends BaseService implements DmpP
if
(!
CollectionUtils
.
isEmpty
(
list
))
{
dto
=
list
.
get
(
0
);
//设置公共属性
DmpPublicConfigInfoRequest
request
=
new
DmpPublicConfigInfoRequest
();
BaseBeanResponse
<
DmpPublicConfigInfoDto
>
configInfoBeanResponse
=
dmpPublicConfigInfoService
.
findList
(
request
,
null
);
List
<
DmpPublicConfigInfoDto
>
configInfoDtos
=
configInfoBeanResponse
.
getDatas
();
if
(!
CollectionUtils
.
isEmpty
(
configInfoDtos
))
{
dto
.
setDmpPublicConfigInfoDto
(
configInfoDtos
.
get
(
0
));
}
}
baseBeanResponse
.
setCode
(
StatuConstant
.
SUCCESS_CODE
);
...
...
src/main/resources/mapper/dmp/DmpSyncingDatasourceTypeMapper.xml
View file @
5f24ed50
...
...
@@ -92,6 +92,15 @@
</where>
</select>
<select
id=
"queryAllByParams"
resultType=
"com.jz.dmp.modules.model.DmpSyncingDatasourceType"
>
select
id, datasource, datasource_catecode, datasource_catename, datasource_type, img_url, data_status, is_enabled,
datasource_catetype, driver_class_name, is_enable_test, default_source_script, default_target_script,
is_enable_source, is_enable_target
from dmp_syncing_datasource_type
where 1=1 and DATA_STATUS='1'
</select>
<!--新增所有列-->
<insert
id=
"insert"
keyProperty=
"id"
useGeneratedKeys=
"true"
>
insert into dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET)
...
...
src/main/resources/mapper/dmp/OfflineSynchMapper.xml
View file @
5f24ed50
...
...
@@ -53,6 +53,7 @@
<if
test =
"isEnableTarget != null and isEnableTarget=='1'.toString()"
>
and dsdt.is_enable_target = '1'
</if>
where ds.data_status = '1' and ds.project_id = #{projectId}
<if
test=
"dbName != null and dbName !='' "
>
and dsdt.datasource = #{dbName}
</if>
<if
test=
"datasourceTypeId != null and datasourceTypeId != '' "
>
and dsdt.id=#{datasourceTypeId}
</if>
<if
test =
'datasourceType != null and datasourceType > 0 '
>
and ds.DATASOURCE_TYPE = #{datasourceType}
</if>
</select>
...
...
src/main/resources/mapper/projconfig/DmpPublicConfigInfoMapper.xml
View file @
5f24ed50
...
...
@@ -26,6 +26,10 @@
<result
column=
"azkaban_exector_shell_export_data"
property=
"azkabanExectorShellExportData"
jdbcType=
"VARCHAR"
/>
<result
column=
"azkaban_monitor_url"
property=
"azkabanMonitorUrl"
jdbcType=
"VARCHAR"
/>
<result
column=
"atlas_monitor_url"
property=
"atlasMonitorUrl"
jdbcType=
"VARCHAR"
/>
<result
column=
"shell_cmd_server"
property=
"shellCmdServer"
jdbcType=
"VARCHAR"
/>
<result
column=
"shell_cmd_user"
property=
"shellCmdUser"
jdbcType=
"VARCHAR"
/>
<result
column=
"shell_cmd_password"
property=
"shellCmdPassword"
jdbcType=
"VARCHAR"
/>
<result
column=
"shell_sftp_port"
property=
"shellSftpPort"
jdbcType=
"INTEGER"
/>
<result
column=
"remark"
property=
"remark"
jdbcType=
"VARCHAR"
/>
<result
column=
"data_status"
property=
"dataStatus"
jdbcType=
"CHAR"
/>
<result
column=
"create_user_id"
property=
"createUserId"
jdbcType=
"INTEGER"
/>
...
...
@@ -43,8 +47,9 @@
kerberos_fqdn, kerberos_keytab_conf, kerberos_keytab_user, kerberos_spark_jaas_conf, hdfs_http_path,
hdfs_syncing_path, hdfs_user_name, kafka_conector_url, kafka_schema_register_url, kafka_bootstrap_servers,
azkaban_exector_shell_exec, azkaban_exector_sql_exec, azkaban_exector_xml_exec, azkaban_exector_sql_path, azkaban_exector_shell_path,
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, remark,
data_status, create_user_id, create_time, update_user_id, update_time
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, shell_cmd_server,
shell_cmd_user, shell_cmd_password, shell_sftp_port, remark, data_status,
create_user_id, create_time, update_user_id, update_time
</sql>
<sql
id=
"BaseDto_Column_List"
>
...
...
@@ -152,6 +157,18 @@
<if
test=
"atlasMonitorUrl != null"
>
AND atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdServer != null"
>
AND shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdUser != null"
>
AND shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdPassword != null"
>
AND shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR}
</if>
<if
test=
"shellSftpPort != null"
>
AND shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER}
</if>
<if
test=
"remark != null"
>
AND remark = #{remark,jdbcType=VARCHAR}
</if>
...
...
@@ -259,6 +276,18 @@
<if
test=
"atlasMonitorUrl != null"
>
AND atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdServer != null"
>
AND shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdUser != null"
>
AND shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdPassword != null"
>
AND shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR}
</if>
<if
test=
"shellSftpPort != null"
>
AND shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER}
</if>
<if
test=
"remark != null"
>
AND remark = #{remark,jdbcType=VARCHAR}
</if>
...
...
@@ -294,16 +323,18 @@
kerberos_fqdn, kerberos_keytab_conf, kerberos_keytab_user, kerberos_spark_jaas_conf, hdfs_http_path,
hdfs_syncing_path, hdfs_user_name, kafka_conector_url, kafka_schema_register_url, kafka_bootstrap_servers,
azkaban_exector_shell_exec, azkaban_exector_sql_exec, azkaban_exector_xml_exec, azkaban_exector_sql_path, azkaban_exector_shell_path,
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, remark,
data_status, create_user_id, create_time, update_user_id, update_time
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, shell_cmd_server,
shell_cmd_user, shell_cmd_password, shell_sftp_port, remark, data_status,
create_user_id, create_time, update_user_id, update_time
)
values (
#{publicConfigId,jdbcType=INTEGER}, #{kerberosIsenable,jdbcType=CHAR}, #{kerberosJaasClientName,jdbcType=VARCHAR}, #{kerberosKrb5Conf,jdbcType=VARCHAR}, #{kerberosJaasConf,jdbcType=VARCHAR},
#{kerberosFqdn,jdbcType=VARCHAR}, #{kerberosKeytabConf,jdbcType=VARCHAR}, #{kerberosKeytabUser,jdbcType=VARCHAR}, #{kerberosSparkJaasConf,jdbcType=VARCHAR}, #{hdfsHttpPath,jdbcType=VARCHAR},
#{hdfsSyncingPath,jdbcType=VARCHAR}, #{hdfsUserName,jdbcType=VARCHAR}, #{kafkaConectorUrl,jdbcType=VARCHAR}, #{kafkaSchemaRegisterUrl,jdbcType=VARCHAR}, #{kafkaBootstrapServers,jdbcType=VARCHAR},
#{azkabanExectorShellExec,jdbcType=VARCHAR}, #{azkabanExectorSqlExec,jdbcType=VARCHAR}, #{azkabanExectorXmlExec,jdbcType=VARCHAR}, #{azkabanExectorSqlPath,jdbcType=VARCHAR}, #{azkabanExectorShellPath,jdbcType=VARCHAR},
#{azkabanLocalTaskFilePath,jdbcType=VARCHAR}, #{azkabanExectorShellExportData,jdbcType=VARCHAR}, #{azkabanMonitorUrl,jdbcType=VARCHAR}, #{atlasMonitorUrl,jdbcType=VARCHAR}, #{remark,jdbcType=VARCHAR},
#{dataStatus,jdbcType=CHAR}, #{createUserId,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, #{updateUserId,jdbcType=INTEGER}, #{updateTime,jdbcType=TIMESTAMP}
#{azkabanLocalTaskFilePath,jdbcType=VARCHAR}, #{azkabanExectorShellExportData,jdbcType=VARCHAR}, #{azkabanMonitorUrl,jdbcType=VARCHAR}, #{atlasMonitorUrl,jdbcType=VARCHAR}, #{shellCmdServer,jdbcType=VARCHAR},
#{shellCmdUser,jdbcType=VARCHAR}, #{shellCmdPassword,jdbcType=VARCHAR}, #{shellSftpPort,jdbcType=INTEGER}, #{remark,jdbcType=VARCHAR}, #{dataStatus,jdbcType=CHAR},
#{createUserId,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, #{updateUserId,jdbcType=INTEGER}, #{updateTime,jdbcType=TIMESTAMP}
)
</insert>
...
...
@@ -314,8 +345,9 @@
kerberos_fqdn, kerberos_keytab_conf, kerberos_keytab_user, kerberos_spark_jaas_conf, hdfs_http_path,
hdfs_syncing_path, hdfs_user_name, kafka_conector_url, kafka_schema_register_url, kafka_bootstrap_servers,
azkaban_exector_shell_exec, azkaban_exector_sql_exec, azkaban_exector_xml_exec, azkaban_exector_sql_path, azkaban_exector_shell_path,
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, remark,
data_status, create_user_id, create_time, update_user_id, update_time
azkaban_local_task_file_path, azkaban_exector_shell_export_data, azkaban_monitor_url, atlas_monitor_url, shell_cmd_server,
shell_cmd_user, shell_cmd_password, shell_sftp_port, remark, data_status,
create_user_id, create_time, update_user_id, update_time
)
values
<foreach
collection=
"list"
item=
"item"
separator=
","
>
...
...
@@ -324,8 +356,9 @@
#{item.kerberosFqdn,jdbcType=VARCHAR}, #{item.kerberosKeytabConf,jdbcType=VARCHAR}, #{item.kerberosKeytabUser,jdbcType=VARCHAR}, #{item.kerberosSparkJaasConf,jdbcType=VARCHAR}, #{item.hdfsHttpPath,jdbcType=VARCHAR},
#{item.hdfsSyncingPath,jdbcType=VARCHAR}, #{item.hdfsUserName,jdbcType=VARCHAR}, #{item.kafkaConectorUrl,jdbcType=VARCHAR}, #{item.kafkaSchemaRegisterUrl,jdbcType=VARCHAR}, #{item.kafkaBootstrapServers,jdbcType=VARCHAR},
#{item.azkabanExectorShellExec,jdbcType=VARCHAR}, #{item.azkabanExectorSqlExec,jdbcType=VARCHAR}, #{item.azkabanExectorXmlExec,jdbcType=VARCHAR}, #{item.azkabanExectorSqlPath,jdbcType=VARCHAR}, #{item.azkabanExectorShellPath,jdbcType=VARCHAR},
#{item.azkabanLocalTaskFilePath,jdbcType=VARCHAR}, #{item.azkabanExectorShellExportData,jdbcType=VARCHAR}, #{item.azkabanMonitorUrl,jdbcType=VARCHAR}, #{item.atlasMonitorUrl,jdbcType=VARCHAR}, #{item.remark,jdbcType=VARCHAR},
#{item.dataStatus,jdbcType=CHAR}, #{item.createUserId,jdbcType=INTEGER}, #{item.createTime,jdbcType=TIMESTAMP}, #{item.updateUserId,jdbcType=INTEGER}, #{item.updateTime,jdbcType=TIMESTAMP}
#{item.azkabanLocalTaskFilePath,jdbcType=VARCHAR}, #{item.azkabanExectorShellExportData,jdbcType=VARCHAR}, #{item.azkabanMonitorUrl,jdbcType=VARCHAR}, #{item.atlasMonitorUrl,jdbcType=VARCHAR}, #{item.shellCmdServer,jdbcType=VARCHAR},
#{item.shellCmdUser,jdbcType=VARCHAR}, #{item.shellCmdPassword,jdbcType=VARCHAR}, #{item.shellSftpPort,jdbcType=INTEGER}, #{item.remark,jdbcType=VARCHAR}, #{item.dataStatus,jdbcType=CHAR},
#{item.createUserId,jdbcType=INTEGER}, #{item.createTime,jdbcType=TIMESTAMP}, #{item.updateUserId,jdbcType=INTEGER}, #{item.updateTime,jdbcType=TIMESTAMP}
)
</foreach>
</insert>
...
...
@@ -406,6 +439,18 @@
<if
test=
"atlasMonitorUrl != null"
>
atlas_monitor_url,
</if>
<if
test=
"shellCmdServer != null"
>
shell_cmd_server,
</if>
<if
test=
"shellCmdUser != null"
>
shell_cmd_user,
</if>
<if
test=
"shellCmdPassword != null"
>
shell_cmd_password,
</if>
<if
test=
"shellSftpPort != null"
>
shell_sftp_port,
</if>
<if
test=
"remark != null"
>
remark,
</if>
...
...
@@ -498,6 +543,18 @@
<if
test=
"atlasMonitorUrl != null"
>
#{atlasMonitorUrl,jdbcType=VARCHAR},
</if>
<if
test=
"shellCmdServer != null"
>
#{shellCmdServer,jdbcType=VARCHAR},
</if>
<if
test=
"shellCmdUser != null"
>
#{shellCmdUser,jdbcType=VARCHAR},
</if>
<if
test=
"shellCmdPassword != null"
>
#{shellCmdPassword,jdbcType=VARCHAR},
</if>
<if
test=
"shellSftpPort != null"
>
#{shellSftpPort,jdbcType=INTEGER},
</if>
<if
test=
"remark != null"
>
#{remark,jdbcType=VARCHAR},
</if>
...
...
@@ -546,6 +603,10 @@
azkaban_exector_shell_export_data = #{azkabanExectorShellExportData,jdbcType=VARCHAR},
azkaban_monitor_url = #{azkabanMonitorUrl,jdbcType=VARCHAR},
atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR},
shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR},
shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR},
shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR},
shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER},
remark = #{remark,jdbcType=VARCHAR},
data_status = #{dataStatus,jdbcType=CHAR},
create_user_id = #{createUserId,jdbcType=INTEGER},
...
...
@@ -631,6 +692,18 @@
<if
test=
"atlasMonitorUrl != null"
>
atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR},
</if>
<if
test=
"shellCmdServer != null"
>
shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR},
</if>
<if
test=
"shellCmdUser != null"
>
shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR},
</if>
<if
test=
"shellCmdPassword != null"
>
shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR},
</if>
<if
test=
"shellSftpPort != null"
>
shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER},
</if>
<if
test=
"remark != null"
>
remark = #{remark,jdbcType=VARCHAR},
</if>
...
...
@@ -731,6 +804,18 @@
<if
test=
"atlasMonitorUrl != null"
>
AND atlas_monitor_url = #{atlasMonitorUrl,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdServer != null"
>
AND shell_cmd_server = #{shellCmdServer,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdUser != null"
>
AND shell_cmd_user = #{shellCmdUser,jdbcType=VARCHAR}
</if>
<if
test=
"shellCmdPassword != null"
>
AND shell_cmd_password = #{shellCmdPassword,jdbcType=VARCHAR}
</if>
<if
test=
"shellSftpPort != null"
>
AND shell_sftp_port = #{shellSftpPort,jdbcType=INTEGER}
</if>
<if
test=
"remark != null"
>
AND remark = #{remark,jdbcType=VARCHAR}
</if>
...
...
@@ -755,7 +840,6 @@
<if
test=
"updateTimeEnd != null"
>
AND update_time
<![CDATA[ <= ]]>
#{updateTimeEnd,jdbcType=TIMESTAMP}
</if>
AND data_status='1'
</where>
</select>
...
...
src/main/resources/templates/lxTaskJson.json
View file @
5f24ed50
...
...
@@ -82,16 +82,17 @@
{
"params"
:
{
"version"
:
"1.0"
,
"parentId"
:
"509"
,
"treeId"
:
669
,
//
"parentId"
:
"509"
,
"mode"
:
"0"
,
"projectId"
:
"31"
,
"taskId"
:
"
任务id"
,
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
"taskId"
:
"
"
,
//任务id
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
//任务名称
"scripts"
:
{
"setting"
:
{
"extract"
:
"incremental"
,
"extractExpression"
:
"where 1=1"
,
"targetInsertMergeOverwrite"
:
"insert"
,
"extract"
:
"incremental"
,
//增量/全量
"extractExpression"
:
"where 1=1"
,
//增量表达式
"targetInsertMergeOverwrite"
:
"insert"
,
//插入合并重写
"ftColumn"
:
"分桶字段"
,
"ftCount"
:
"分桶个数"
,
"separateMax"
:
"分桶字段最大值"
,
...
...
@@ -102,23 +103,26 @@
"preImportStatement"
:
"导入前语句"
,
"errorLimitRecord"
:
"错误记录数超过"
,
"maxConcurrency"
:
"最大并发数"
,
"syncRate"
:
"同步速率"
//
"syncRate"
:
"同步速率"
,
"executorMemory"
:
"1"
,
//分配任务内存
"executorCores"
:
"1"
,
//单executor的cpu数
"totalExecutorCores"
:
"1"
//总executor的cpu数
},
"reader"
:
{
"dbConnection"
:
"mysql_dmp_demo_test"
,
"fileType"
:
""
,
"sourceHdfsPath"
:
""
,
"dbConnection"
:
"mysql_dmp_demo_test"
,
//来源名称
"fileType"
:
""
,
//文件类型
"sourceHdfsPath"
:
""
,
//HDFS存储目录
"sourceHdfsFile"
:
""
,
"sourceFtpDir"
:
""
,
"sourceFtpFile"
:
""
,
"sourceSkipFtpFile"
:
""
,
"sourceCsvDelimiter"
:
""
,
"sourceCsvHeader"
:
""
,
"sourceCsvCharset"
:
""
,
"sourceFtpDir"
:
""
,
//文件所在目录
"sourceFtpFile"
:
""
,
//文件名
"sourceSkipFtpFile"
:
""
,
//没有数据文件是否跳过
"sourceCsvDelimiter"
:
""
,
//分隔符
"sourceCsvHeader"
:
""
,
//是否含有表头
"sourceCsvCharset"
:
""
,
//字符集编码
"sourceCsvQuote"
:
""
,
"sourceFtpLoadDate"
:
""
,
"sourceFtpLoadDate"
:
""
,
//加载数据日期
"registerTableName"
:
"dmp_azkaban_exector_server_config"
,
"dayByDay"
:
"false"
,
"dayByDay"
:
"false"
,
//day_by_day
"column"
:
[
{
"name"
:
"host"
,
...
...
@@ -151,8 +155,8 @@
{
"name"
:
"host"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"1"
,
"isPt"
:
"0"
,
"isPk"
:
"1"
,
//主键
"isPt"
:
"0"
,
//分区
"rules"
:
[
{
"method"
:
""
,
...
...
@@ -199,7 +203,6 @@
]
}
},
"treeId"
:
669
,
"taskRules"
:
[
{
"ruleId"
:
""
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment