Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
cd138596
Commit
cd138596
authored
Jan 27, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
commit
parent
c34bf441
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
37 additions
and
33 deletions
+37
-33
OfflineSynchService.java
.../java/com/jz/dmp/modules/service/OfflineSynchService.java
+1
-0
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+13
-13
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+1
-1
lxTaskJson.json
src/main/resources/templates/lxTaskJson.json
+22
-19
No files found.
src/main/java/com/jz/dmp/modules/service/OfflineSynchService.java
View file @
cd138596
...
...
@@ -120,4 +120,5 @@ public interface OfflineSynchService {
* @since 2021-01-26
*/
JsonResult
addNewSynchTask
(
NewSynchTaskReq
newSynchTaskReq
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
cd138596
...
...
@@ -197,12 +197,12 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
// 脚本模式-json内容,解析JSON串并组装XML内容
Map
<
String
,
Object
>
settingMap
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"setting"
);
String
_extract
=
(
String
)
settingMap
.
get
(
"extract"
);
String
_extractExpression
=
(
String
)
settingMap
.
get
(
"extractExpression"
);
String
_extractExpression
=
(
String
)
settingMap
.
get
(
"extractExpression"
);
//增量表达式
String
_targetBucketCounts
=
(
String
)
settingMap
.
get
(
"targetBucketCounts"
);
String
_errorLimitRecord
=
(
String
)
settingMap
.
get
(
"errorLimitRecord"
);
//错误记录数超过
String
_executorMemory
=
(
String
)
settingMap
.
get
(
"executorMemory"
);
String
_executorCores
=
(
String
)
settingMap
.
get
(
"executorCores"
);
String
_totalExecutorCores
=
(
String
)
settingMap
.
get
(
"totalExecutorCores"
);
String
_executorMemory
=
(
String
)
settingMap
.
get
(
"executorMemory"
);
//分配任务内存
String
_executorCores
=
(
String
)
settingMap
.
get
(
"executorCores"
);
//单executor的cpu数
String
_totalExecutorCores
=
(
String
)
settingMap
.
get
(
"totalExecutorCores"
);
//总executor的cpu数
String
_ftColumn
=
(
String
)
settingMap
.
get
(
"ftColumn"
);
//分桶字段
String
_ftCount
=
(
String
)
settingMap
.
get
(
"ftCount"
);
//分桶个数
String
_separateMax
=
(
String
)
settingMap
.
get
(
"separateMax"
);
//分桶字段最大值
...
...
@@ -216,19 +216,19 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
Map
<
String
,
Object
>
readerMap
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"reader"
);
String
_dbConnection
=
(
String
)
readerMap
.
get
(
"dbConnection"
);
//源数据库名称
String
_fileType
=
(
String
)
readerMap
.
get
(
"fileType"
);
//文件类型
String
_sourceHdfsPath
=
(
String
)
readerMap
.
get
(
"sourceHdfsPath"
);
String
_sourceHdfsPath
=
(
String
)
readerMap
.
get
(
"sourceHdfsPath"
);
//HDFS存储目录
String
_sourceHdfsFile
=
(
String
)
readerMap
.
get
(
"sourceHdfsFile"
);
String
_sourceFtpDir
=
(
String
)
readerMap
.
get
(
"sourceFtpDir"
);
String
_sourceFtpFile
=
(
String
)
readerMap
.
get
(
"sourceFtpFile"
);
String
_sourceSkipFtpFile
=
(
String
)
readerMap
.
get
(
"sourceSkipFtpFile"
);
String
_sourceCsvDelimiter
=
(
String
)
readerMap
.
get
(
"sourceCsvDelimiter"
);
String
_sourceCsvHeader
=
(
String
)
readerMap
.
get
(
"sourceCsvHeader"
);
String
_sourceCsvCharset
=
(
String
)
readerMap
.
get
(
"sourceCsvCharset"
);
String
_sourceFtpDir
=
(
String
)
readerMap
.
get
(
"sourceFtpDir"
);
//文件所在目录
String
_sourceFtpFile
=
(
String
)
readerMap
.
get
(
"sourceFtpFile"
);
//文件名
String
_sourceSkipFtpFile
=
(
String
)
readerMap
.
get
(
"sourceSkipFtpFile"
);
//没有数据文件是否跳过
String
_sourceCsvDelimiter
=
(
String
)
readerMap
.
get
(
"sourceCsvDelimiter"
);
//分隔符
String
_sourceCsvHeader
=
(
String
)
readerMap
.
get
(
"sourceCsvHeader"
);
//是否含有表头
String
_sourceCsvCharset
=
(
String
)
readerMap
.
get
(
"sourceCsvCharset"
);
//字符集编码
String
_sourceCsvQuote
=
(
String
)
readerMap
.
get
(
"sourceCsvQuote"
);
String
_sourceFtpLoadDate
=
(
String
)
readerMap
.
get
(
"sourceFtpLoadDate"
);
String
_sourceFtpLoadDate
=
(
String
)
readerMap
.
get
(
"sourceFtpLoadDate"
);
//加载数据日期
String
_registerTableName
=
(
String
)
readerMap
.
get
(
"registerTableName"
);
//源数据库表名称
String
registerTableName_
=
(
String
)
readerMap
.
get
(
"registerTableName"
);
String
_dayByDay
=
(
String
)
readerMap
.
get
(
"dayByDay"
);
String
_dayByDay
=
(
String
)
readerMap
.
get
(
"dayByDay"
);
//dayByDay
List
<
Map
<
String
,
Object
>>
_readerColumns
=
(
List
<
Map
<
String
,
Object
>>)
readerMap
.
get
(
"column"
);
//源数据库表字段
//******目标数据******
...
...
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
cd138596
...
...
@@ -119,7 +119,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
public
JsonResult
<
List
<
SourceDbNameListDto
>>
querygSourceDbList
(
Integer
projectId
)
throws
Exception
{
Map
map
=
new
HashMap
();
map
.
put
(
"projectId"
,
projectId
);
//项目id
map
.
put
(
"isEnableSource"
,
"1"
);
//
map.put("isEnableSource", "1");
List
<
SourceDbNameListDto
>
list
=
offlineSynchDao
.
querygSourceDbList
(
map
);
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
list
);
}
...
...
src/main/resources/templates/lxTaskJson.json
View file @
cd138596
...
...
@@ -82,16 +82,17 @@
{
"params"
:
{
"version"
:
"1.0"
,
"treeId"
:
669
,
"parentId"
:
"509"
,
"mode"
:
"0"
,
"projectId"
:
"31"
,
"taskId"
:
"
任务id"
,
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
"taskId"
:
"
"
,
//任务id
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
//任务名称
"scripts"
:
{
"setting"
:
{
"extract"
:
"incremental"
,
"extractExpression"
:
"where 1=1"
,
"targetInsertMergeOverwrite"
:
"insert"
,
"extractExpression"
:
"where 1=1"
,
//增量表达式
"targetInsertMergeOverwrite"
:
"insert"
,
//插入合并重写
"ftColumn"
:
"分桶字段"
,
"ftCount"
:
"分桶个数"
,
"separateMax"
:
"分桶字段最大值"
,
...
...
@@ -102,23 +103,26 @@
"preImportStatement"
:
"导入前语句"
,
"errorLimitRecord"
:
"错误记录数超过"
,
"maxConcurrency"
:
"最大并发数"
,
"syncRate"
:
"同步速率"
//
"syncRate"
:
"同步速率"
,
"executorMemory"
:
"1"
,
//分配任务内存
"executorCores"
:
"1"
,
//单executor的cpu数
"totalExecutorCores"
:
"1"
//总executor的cpu数
},
"reader"
:
{
"dbConnection"
:
"mysql_dmp_demo_test"
,
"fileType"
:
""
,
"sourceHdfsPath"
:
""
,
"dbConnection"
:
"mysql_dmp_demo_test"
,
//来源名称
"fileType"
:
""
,
//文件类型
"sourceHdfsPath"
:
""
,
//HDFS存储目录
"sourceHdfsFile"
:
""
,
"sourceFtpDir"
:
""
,
"sourceFtpFile"
:
""
,
"sourceSkipFtpFile"
:
""
,
"sourceCsvDelimiter"
:
""
,
"sourceCsvHeader"
:
""
,
"sourceCsvCharset"
:
""
,
"sourceFtpDir"
:
""
,
//文件所在目录
"sourceFtpFile"
:
""
,
//文件名
"sourceSkipFtpFile"
:
""
,
//没有数据文件是否跳过
"sourceCsvDelimiter"
:
""
,
//分隔符
"sourceCsvHeader"
:
""
,
//是否含有表头
"sourceCsvCharset"
:
""
,
//字符集编码
"sourceCsvQuote"
:
""
,
"sourceFtpLoadDate"
:
""
,
"sourceFtpLoadDate"
:
""
,
//加载数据日期
"registerTableName"
:
"dmp_azkaban_exector_server_config"
,
"dayByDay"
:
"false"
,
"dayByDay"
:
"false"
,
//day_by_day
"column"
:
[
{
"name"
:
"host"
,
...
...
@@ -151,8 +155,8 @@
{
"name"
:
"host"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"1"
,
"isPt"
:
"0"
,
"isPk"
:
"1"
,
//主键
"isPt"
:
"0"
,
//分区
"rules"
:
[
{
"method"
:
""
,
...
...
@@ -199,7 +203,6 @@
]
}
},
"treeId"
:
669
,
"taskRules"
:
[
{
"ruleId"
:
""
,
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment