Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
be5c85f8
Commit
be5c85f8
authored
Dec 30, 2020
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
f353dd2b
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
109 additions
and
91 deletions
+109
-91
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+10
-1
lxTaskJson.json
src/main/resources/lxTaskJson.json
+99
-90
No files found.
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
be5c85f8
...
@@ -128,7 +128,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -128,7 +128,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
String
_extract
=
(
String
)
settingMap
.
get
(
"extract"
);
String
_extract
=
(
String
)
settingMap
.
get
(
"extract"
);
String
_extractExpression
=
(
String
)
settingMap
.
get
(
"extractExpression"
);
String
_extractExpression
=
(
String
)
settingMap
.
get
(
"extractExpression"
);
String
_targetBucketCounts
=
(
String
)
settingMap
.
get
(
"targetBucketCounts"
);
String
_targetBucketCounts
=
(
String
)
settingMap
.
get
(
"targetBucketCounts"
);
String
_errorLimitRecord
=
(
String
)
settingMap
.
get
(
"errorLimitRecord"
);
String
_errorLimitRecord
=
(
String
)
settingMap
.
get
(
"errorLimitRecord"
);
//错误记录数超过
String
_executorMemory
=
(
String
)
settingMap
.
get
(
"executorMemory"
);
String
_executorMemory
=
(
String
)
settingMap
.
get
(
"executorMemory"
);
String
_executorCores
=
(
String
)
settingMap
.
get
(
"executorCores"
);
String
_executorCores
=
(
String
)
settingMap
.
get
(
"executorCores"
);
String
_totalExecutorCores
=
(
String
)
settingMap
.
get
(
"totalExecutorCores"
);
String
_totalExecutorCores
=
(
String
)
settingMap
.
get
(
"totalExecutorCores"
);
...
@@ -136,6 +136,10 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -136,6 +136,10 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
String
_ftCount
=
(
String
)
settingMap
.
get
(
"ftCount"
);
//分桶个数
String
_ftCount
=
(
String
)
settingMap
.
get
(
"ftCount"
);
//分桶个数
String
_separateMax
=
(
String
)
settingMap
.
get
(
"separateMax"
);
//分桶字段最大值
String
_separateMax
=
(
String
)
settingMap
.
get
(
"separateMax"
);
//分桶字段最大值
String
_separateMin
=
(
String
)
settingMap
.
get
(
"separateMin"
);
//分桶字段最小值
String
_separateMin
=
(
String
)
settingMap
.
get
(
"separateMin"
);
//分桶字段最小值
String
_syncRate
=
(
String
)
settingMap
.
get
(
"syncRate"
);
//同步速率
String
_maxConcurrency
=
(
String
)
settingMap
.
get
(
"maxConcurrency"
);
//最大并发数
String
_preImportStatement
=
(
String
)
settingMap
.
get
(
"preImportStatement"
);
//导入前语句
String
_postImportStatement
=
(
String
)
settingMap
.
get
(
"postImportStatement"
);
//导入后语句
//******源数据******
//******源数据******
Map
<
String
,
Object
>
readerMap
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"reader"
);
Map
<
String
,
Object
>
readerMap
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"reader"
);
...
@@ -288,6 +292,11 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -288,6 +292,11 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
sb
.
append
(
"<ft_count>"
).
append
(
_ftCount
).
append
(
"</ft_count>"
).
append
(
"\r\n"
);
sb
.
append
(
"<ft_count>"
).
append
(
_ftCount
).
append
(
"</ft_count>"
).
append
(
"\r\n"
);
sb
.
append
(
"<separate_max>"
).
append
(
_separateMax
).
append
(
"</separate_max>"
).
append
(
"\r\n"
);
sb
.
append
(
"<separate_max>"
).
append
(
_separateMax
).
append
(
"</separate_max>"
).
append
(
"\r\n"
);
sb
.
append
(
"<separate_min>"
).
append
(
_separateMin
).
append
(
"</separate_min>"
).
append
(
"\r\n"
);
sb
.
append
(
"<separate_min>"
).
append
(
_separateMin
).
append
(
"</separate_min>"
).
append
(
"\r\n"
);
sb
.
append
(
"<syncRate>"
).
append
(
_syncRate
).
append
(
"</syncRate>"
).
append
(
"\r\n"
);
sb
.
append
(
"<maxConcurrency>"
).
append
(
_maxConcurrency
).
append
(
"</maxConcurrency>"
).
append
(
"\r\n"
);
sb
.
append
(
"<preImportStatement>"
).
append
(
_preImportStatement
).
append
(
"</preImportStatement>"
).
append
(
"\r\n"
);
sb
.
append
(
"<postImportStatement>"
).
append
(
_postImportStatement
).
append
(
"</postImportStatement>"
).
append
(
"\r\n"
);
sb
.
append
(
"<errorLimitRecord>"
).
append
(
_errorLimitRecord
).
append
(
"</errorLimitRecord>"
).
append
(
"\r\n"
);
sb
.
append
(
"<executor_memory>"
).
append
(
_executorMemory
).
append
(
"</executor_memory>"
).
append
(
"\r\n"
);
sb
.
append
(
"<executor_memory>"
).
append
(
_executorMemory
).
append
(
"</executor_memory>"
).
append
(
"\r\n"
);
sb
.
append
(
"<executor_cores>"
).
append
(
_executorCores
).
append
(
"</executor_cores>"
).
append
(
"\r\n"
);
sb
.
append
(
"<executor_cores>"
).
append
(
_executorCores
).
append
(
"</executor_cores>"
).
append
(
"\r\n"
);
...
...
src/main/resources/lxTaskJson.json
View file @
be5c85f8
...
@@ -80,130 +80,139 @@
...
@@ -80,130 +80,139 @@
{
{
"params"
:{
"params"
:
{
"version"
:
"1.0"
,
"version"
:
"1.0"
,
"parentId"
:
"509"
,
"parentId"
:
"509"
,
"mode"
:
"0"
,
"mode"
:
"0"
,
"projectId"
:
"31"
,
"projectId"
:
"31"
,
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
"scripts"
:{
"scripts"
:
{
"setting"
:{
"setting"
:
{
"extract"
:
"incremental"
,
"extract"
:
"incremental"
,
"extractExpression"
:
"where 1=1"
,
"extractExpression"
:
"where 1=1"
,
"targetInsertMergeOverwrite"
:
"insert"
,
"targetInsertMergeOverwrite"
:
"insert"
,
"ftColumn"
:
""
,
"ftColumn"
:
"分桶字段"
,
"ftCount"
:
"20"
,
"ftCount"
:
"分桶个数"
,
"separateMax"
:
""
,
"separateMax"
:
"分桶字段最大值"
,
"separateMin"
:
""
,
"separateMin"
:
"分桶字段最小值"
,
"primaryKey"
:
"on"
,
"primaryKey"
:
"主键"
,
"partition"
:
"on"
"partition"
:
"分区"
,
"postImportStatement"
:
"导入后语句"
,
"preImportStatement"
:
"导入前语句"
,
"errorLimitRecord"
:
"错误记录数超过"
,
"maxConcurrency"
:
"最大并发数"
,
"syncRate"
:
"同步速率"
},
},
"reader"
:{
"reader"
:
{
"dbConnection"
:
"mysql_dmp_demo_test"
,
"dbConnection"
:
"mysql_dmp_demo_test"
,
"fileType"
:
""
,
"fileType"
:
""
,
"sourceHdfsPath"
:
""
,
"sourceHdfsPath"
:
""
,
"sourceHdfsFile"
:
""
,
"sourceHdfsFile"
:
""
,
"sourceFtpDir"
:
""
,
"sourceFtpDir"
:
""
,
"sourceFtpFile"
:
""
,
"sourceFtpFile"
:
""
,
"sourceSkipFtpFile"
:
""
,
"sourceSkipFtpFile"
:
""
,
"sourceCsvDelimiter"
:
""
,
"sourceCsvDelimiter"
:
""
,
"sourceCsvHeader"
:
""
,
"sourceCsvHeader"
:
""
,
"sourceCsvCharset"
:
""
,
"sourceCsvCharset"
:
""
,
"sourceCsvQuote"
:
""
,
"sourceCsvQuote"
:
""
,
"sourceFtpLoadDate"
:
""
,
"sourceFtpLoadDate"
:
""
,
"registerTableName"
:
"dmp_azkaban_exector_server_config"
,
"registerTableName"
:
"dmp_azkaban_exector_server_config"
,
"dayByDay"
:
"false"
,
"dayByDay"
:
"false"
,
"column"
:[
"column"
:
[
{
{
"name"
:
"host"
,
"name"
:
"host"
,
"type"
:
"VARCHAR"
"type"
:
"VARCHAR"
},
},
{
{
"name"
:
"port"
,
"name"
:
"port"
,
"type"
:
"VARCHAR"
"type"
:
"VARCHAR"
},
},
{
{
"name"
:
"user_name"
,
"name"
:
"user_name"
,
"type"
:
"VARCHAR"
"type"
:
"VARCHAR"
},
},
{
{
"name"
:
"pass_word"
,
"name"
:
"pass_word"
,
"type"
:
"VARCHAR"
"type"
:
"VARCHAR"
}
}
]
]
},
},
"writer"
:{
"writer"
:
{
"targetDbConnection"
:
"mysql_dmp_demo"
,
"targetDbConnection"
:
"mysql_dmp_demo"
,
"targetTable"
:
"dmp_azkaban_exector_server_config"
,
"targetTable"
:
"dmp_azkaban_exector_server_config"
,
"targetFtpDir"
:
""
,
"targetFtpDir"
:
""
,
"targetFtpFile"
:
""
,
"targetFtpFile"
:
""
,
"targetCsvDelimiter"
:
""
,
"targetCsvDelimiter"
:
""
,
"targetCsvHeader"
:
""
,
"targetCsvHeader"
:
""
,
"targetCsvCharset"
:
""
,
"targetCsvCharset"
:
""
,
"targetInsertMergeOverwrite"
:
"insert"
,
"targetInsertMergeOverwrite"
:
"insert"
,
"column"
:[
"column"
:
[
{
{
"name"
:
"host"
,
"name"
:
"host"
,
"type"
:
"VARCHAR"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"1"
,
"isPk"
:
"1"
,
"isPt"
:
"0"
,
"isPt"
:
"0"
,
"rules"
:[
"rules"
:
[
{
"method"
:
""
,
{
"type"
:
""
"method"
:
""
,
"type"
:
""
}
}
]
]
},
},
{
{
"name"
:
"port"
,
"name"
:
"port"
,
"type"
:
"VARCHAR"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"0"
,
"isPk"
:
"0"
,
"isPt"
:
"1"
,
"isPt"
:
"1"
,
"rules"
:[
"rules"
:
[
{
"method"
:
""
,
{
"type"
:
""
"method"
:
""
,
"type"
:
""
}
}
]
]
},
},
{
{
"name"
:
"user_name"
,
"name"
:
"user_name"
,
"type"
:
"VARCHAR"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"0"
,
"isPk"
:
"0"
,
"isPt"
:
"0"
,
"isPt"
:
"0"
,
"rules"
:[
"rules"
:
[
{
"method"
:
""
,
{
"type"
:
""
"method"
:
""
,
"type"
:
""
}
}
]
]
},
},
{
{
"name"
:
"pass_word"
,
"name"
:
"pass_word"
,
"type"
:
"VARCHAR"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"0"
,
"isPk"
:
"0"
,
"isPt"
:
"0"
,
"isPt"
:
"0"
,
"rules"
:[
"rules"
:
[
{
"method"
:
""
,
{
"type"
:
""
"method"
:
""
,
"type"
:
""
}
}
]
]
}
}
]
]
}
}
},
},
"treeId"
:
669
,
"treeId"
:
669
,
"taskRules"
:[
"taskRules"
:
[
{
{
"ruleId"
:
""
,
"ruleId"
:
""
,
"ruleValue"
:{
"ruleValue"
:
{
"dv_fields"
:[
"dv_fields"
:
[
{
{
"fieldName"
:
""
"fieldName"
:
""
}
}
],
],
"dvTime"
:{
"dvTime"
:
{
"timeField"
:
""
,
"timeField"
:
""
,
"timeValue"
:{
"timeValue"
:
{
"startTime"
:
""
,
"startTime"
:
""
,
"endTime"
:
""
"endTime"
:
""
}
}
}
}
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment