Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
f353dd2b
Commit
f353dd2b
authored
Dec 30, 2020
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
61ebabf8
Changes
5
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
292 additions
and
153 deletions
+292
-153
DmpDevelopTaskDao.java
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
+1
-1
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+78
-74
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+5
-4
lxTaskJson.json
src/main/resources/lxTaskJson.json
+206
-72
DmpDevelopTaskMapper.xml
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
+2
-2
No files found.
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
View file @
f353dd2b
...
@@ -15,7 +15,7 @@ public interface DmpDevelopTaskDao {
...
@@ -15,7 +15,7 @@ public interface DmpDevelopTaskDao {
int
deleteNavigationTreeByTreeId
(
String
treeId
)
throws
Exception
;
int
deleteNavigationTreeByTreeId
(
String
treeId
)
throws
Exception
;
Integer
getDbInfoByParam
(
@Param
(
"xmlTdb"
)
String
xmlTdb
,
@Param
(
"projectId"
)
Integer
projectId
);
Integer
getDbInfoByParam
(
@Param
(
"xmlTdb"
)
String
xmlTdb
,
@Param
(
"projectId"
)
Integer
projectId
)
throws
Exception
;
int
insert
(
DmpDevelopTask
dmpDevelopTask
)
throws
Exception
;
int
insert
(
DmpDevelopTask
dmpDevelopTask
)
throws
Exception
;
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
f353dd2b
...
@@ -67,7 +67,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -67,7 +67,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
/**
/**
* 提交离线同步到指定projectId项目下(上传xml配置文件)
* 提交离线同步到指定projectId项目下(上传xml配置文件)
*
*
/
*/
private
JsonResult
submitSyncXml
(
long
projectId
,
String
xmlContent
)
{
private
JsonResult
submitSyncXml
(
long
projectId
,
String
xmlContent
)
{
String
_type_
=
XmlUtils
.
getPropertyValue
(
xmlContent
,
"type"
);
String
_type_
=
XmlUtils
.
getPropertyValue
(
xmlContent
,
"type"
);
String
_fileName_
=
XmlUtils
.
getPropertyValue
(
xmlContent
,
"name"
);
String
_fileName_
=
XmlUtils
.
getPropertyValue
(
xmlContent
,
"name"
);
...
@@ -106,6 +106,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -106,6 +106,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
rst
);
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
rst
);
}
}
}
}
/*
/*
* 生成xml
* 生成xml
* */
* */
...
@@ -131,10 +132,10 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -131,10 +132,10 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
String
_executorMemory
=
(
String
)
settingMap
.
get
(
"executorMemory"
);
String
_executorMemory
=
(
String
)
settingMap
.
get
(
"executorMemory"
);
String
_executorCores
=
(
String
)
settingMap
.
get
(
"executorCores"
);
String
_executorCores
=
(
String
)
settingMap
.
get
(
"executorCores"
);
String
_totalExecutorCores
=
(
String
)
settingMap
.
get
(
"totalExecutorCores"
);
String
_totalExecutorCores
=
(
String
)
settingMap
.
get
(
"totalExecutorCores"
);
String
_ftColumn
=(
String
)
settingMap
.
get
(
"ftColumn"
);
//分桶字段
String
_ftColumn
=
(
String
)
settingMap
.
get
(
"ftColumn"
);
//分桶字段
String
_ftCount
=
(
String
)
settingMap
.
get
(
"ftCount"
);
//分桶个数
String
_ftCount
=
(
String
)
settingMap
.
get
(
"ftCount"
);
//分桶个数
String
_separateMax
=(
String
)
settingMap
.
get
(
"separateMax"
);
//分桶字段最大值
String
_separateMax
=
(
String
)
settingMap
.
get
(
"separateMax"
);
//分桶字段最大值
String
_separateMin
=(
String
)
settingMap
.
get
(
"separateMin"
);
//分桶字段最小值
String
_separateMin
=
(
String
)
settingMap
.
get
(
"separateMin"
);
//分桶字段最小值
//******源数据******
//******源数据******
Map
<
String
,
Object
>
readerMap
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"reader"
);
Map
<
String
,
Object
>
readerMap
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"reader"
);
...
@@ -168,10 +169,10 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -168,10 +169,10 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
List
<
Map
<
String
,
Object
>>
_writerColumns
=
(
List
<
Map
<
String
,
Object
>>)
writerMap
.
get
(
"column"
);
//目标数据库表字段
List
<
Map
<
String
,
Object
>>
_writerColumns
=
(
List
<
Map
<
String
,
Object
>>)
writerMap
.
get
(
"column"
);
//目标数据库表字段
Integer
_projectId_
=
body
.
getProjectId
();
//项目id
Integer
_projectId_
=
body
.
getProjectId
();
//项目id
Integer
_parentId_
=
(
Integer
)
map
.
get
(
"parentId"
);
//父节点id
/*
Integer _parentId_ = (Integer) map.get("parentId"); //父节点id
String _mode_ = (String) map.get("mode");
String _mode_ = (String) map.get("mode");
String _version_ = (String) map.get("version");
String _version_ = (String) map.get("version");
String
_name_
=
(
String
)
map
.
get
(
"name"
);
//节点数名称
String _name_ = (String) map.get("name"); //节点数名称
*/
//根据项目id和数据源名称 查询数据信息
//根据项目id和数据源名称 查询数据信息
DmpSyncingDatasource
sDS
=
this
.
getDmpSyncingDatasource
(
_projectId_
,
_dbConnection
);
//源
DmpSyncingDatasource
sDS
=
this
.
getDmpSyncingDatasource
(
_projectId_
,
_dbConnection
);
//源
...
@@ -194,7 +195,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -194,7 +195,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
boolean
isBigData2ftp
=
(
"HIVE"
.
equals
(
st
)
||
"KUDU"
.
equals
(
st
))
&&
"FTP"
.
equals
(
tt
);
boolean
isBigData2ftp
=
(
"HIVE"
.
equals
(
st
)
||
"KUDU"
.
equals
(
st
))
&&
"FTP"
.
equals
(
tt
);
if
(
"RDBMS"
.
equals
(
sDST
.
getDatasourceCatecode
())
&&
"HIVE"
.
equals
(
tt
))
{
if
(
"RDBMS"
.
equals
(
sDST
.
getDatasourceCatecode
())
&&
"HIVE"
.
equals
(
tt
))
{
_registerTableName
=
sDS
.
getDatasourceName
()
+
"_"
+
_registerTableName
.
substring
(
_registerTableName
.
indexOf
(
"."
)
+
1
);
_registerTableName
=
sDS
.
getDatasourceName
()
+
"_"
+
_registerTableName
.
substring
(
_registerTableName
.
indexOf
(
"."
)
+
1
);
}
}
String
name_
=
(
StringUtils
.
hasLength
(
_targetDbName_
)
?
_targetDbName_
:
tDS
.
getDatasourceName
())
+
"_"
+
_targetTable
;
String
name_
=
(
StringUtils
.
hasLength
(
_targetDbName_
)
?
_targetDbName_
:
tDS
.
getDatasourceName
())
+
"_"
+
_targetTable
;
...
@@ -222,7 +223,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -222,7 +223,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
if
(
"RDBMS"
.
equals
(
sDST
.
getDatasourceCatecode
()))
{
if
(
"RDBMS"
.
equals
(
sDST
.
getDatasourceCatecode
()))
{
sb
.
append
(
"<source_table db_connection=\""
).
append
(
sDS
.
getDatasourceName
()).
append
(
"\" register_table_name=\""
)
sb
.
append
(
"<source_table db_connection=\""
).
append
(
sDS
.
getDatasourceName
()).
append
(
"\" register_table_name=\""
)
.
append
(
_registerTableName
.
substring
(
_registerTableName
.
indexOf
(
"."
)
+
1
)).
append
(
"\">"
).
append
(
source_table_
).
append
(
"</source_table>"
).
append
(
"\r\n"
);
.
append
(
_registerTableName
.
substring
(
_registerTableName
.
indexOf
(
"."
)
+
1
)).
append
(
"\">"
).
append
(
source_table_
).
append
(
"</source_table>"
).
append
(
"\r\n"
);
}
}
if
(
"HIVE"
.
equals
(
st
)
||
"KUDU"
.
equals
(
st
))
{
if
(
"HIVE"
.
equals
(
st
)
||
"KUDU"
.
equals
(
st
))
{
String
sourceTableName
=
_registerTableName
;
String
sourceTableName
=
_registerTableName
;
...
@@ -299,6 +300,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -299,6 +300,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
}
}
return
sb
.
toString
();
return
sb
.
toString
();
}
}
/*
/*
*根据项目id和数据源名称 查询数据信息
*根据项目id和数据源名称 查询数据信息
* */
* */
...
@@ -308,7 +310,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -308,7 +310,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
ds
.
setDatasourceName
(
dsName
);
ds
.
setDatasourceName
(
dsName
);
ds
.
setProjectId
(
projectId
);
ds
.
setProjectId
(
projectId
);
List
<
DmpSyncingDatasource
>
dsList
=
dmpSyncingDatasourceService
.
findListByParams
(
ds
);
List
<
DmpSyncingDatasource
>
dsList
=
dmpSyncingDatasourceService
.
findListByParams
(
ds
);
if
(
dsList
!=
null
&&
dsList
.
size
()
>
0
)
{
if
(
dsList
!=
null
&&
dsList
.
size
()
>
0
)
{
return
dsList
.
get
(
0
);
return
dsList
.
get
(
0
);
}
else
{
}
else
{
return
null
;
return
null
;
...
@@ -319,17 +321,17 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -319,17 +321,17 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
* 获取文件类型
* 获取文件类型
* */
* */
private
String
getFileType
(
String
ft
,
String
_sourceHdfsFile
,
String
_sourceFtpFile
)
{
private
String
getFileType
(
String
ft
,
String
_sourceHdfsFile
,
String
_sourceFtpFile
)
{
if
(
StringUtils
.
isBlank
(
ft
))
{
if
(
StringUtils
.
isBlank
(
ft
))
{
if
(
StringUtils
.
isNotBlank
(
_sourceHdfsFile
))
{
if
(
StringUtils
.
isNotBlank
(
_sourceHdfsFile
))
{
int
idx
=
_sourceHdfsFile
.
lastIndexOf
(
"."
);
int
idx
=
_sourceHdfsFile
.
lastIndexOf
(
"."
);
if
(
idx
>
-
1
)
{
if
(
idx
>
-
1
)
{
String
nft
=
_sourceHdfsFile
.
substring
(
idx
+
1
,
_sourceHdfsFile
.
length
());
String
nft
=
_sourceHdfsFile
.
substring
(
idx
+
1
,
_sourceHdfsFile
.
length
());
if
(
StringUtils
.
isNotBlank
(
nft
))
{
if
(
StringUtils
.
isNotBlank
(
nft
))
{
if
(
"csv"
.
equalsIgnoreCase
(
nft
))
{
if
(
"csv"
.
equalsIgnoreCase
(
nft
))
{
ft
=
"csv"
;
ft
=
"csv"
;
}
else
if
(
"json"
.
equalsIgnoreCase
(
nft
))
{
}
else
if
(
"json"
.
equalsIgnoreCase
(
nft
))
{
ft
=
"json"
;
ft
=
"json"
;
}
else
if
(
"txt"
.
equalsIgnoreCase
(
nft
))
{
}
else
if
(
"txt"
.
equalsIgnoreCase
(
nft
))
{
ft
=
"csv"
;
ft
=
"csv"
;
}
else
{
}
else
{
ft
=
"csv"
;
ft
=
"csv"
;
...
@@ -337,16 +339,16 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -337,16 +339,16 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
}
}
}
}
}
}
if
(
StringUtils
.
isNotBlank
(
_sourceFtpFile
))
{
if
(
StringUtils
.
isNotBlank
(
_sourceFtpFile
))
{
int
idx
=
_sourceFtpFile
.
lastIndexOf
(
"."
);
int
idx
=
_sourceFtpFile
.
lastIndexOf
(
"."
);
if
(
idx
>
-
1
)
{
if
(
idx
>
-
1
)
{
String
nft
=
_sourceFtpFile
.
substring
(
idx
+
1
,
_sourceFtpFile
.
length
());
String
nft
=
_sourceFtpFile
.
substring
(
idx
+
1
,
_sourceFtpFile
.
length
());
if
(
StringUtils
.
isNotBlank
(
nft
))
{
if
(
StringUtils
.
isNotBlank
(
nft
))
{
if
(
"csv"
.
equalsIgnoreCase
(
nft
))
{
if
(
"csv"
.
equalsIgnoreCase
(
nft
))
{
ft
=
"csv"
;
ft
=
"csv"
;
}
else
if
(
"json"
.
equalsIgnoreCase
(
nft
))
{
}
else
if
(
"json"
.
equalsIgnoreCase
(
nft
))
{
ft
=
"json"
;
ft
=
"json"
;
}
else
if
(
"txt"
.
equalsIgnoreCase
(
nft
))
{
}
else
if
(
"txt"
.
equalsIgnoreCase
(
nft
))
{
ft
=
"csv"
;
ft
=
"csv"
;
}
else
{
}
else
{
ft
=
"csv"
;
ft
=
"csv"
;
...
@@ -360,21 +362,21 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -360,21 +362,21 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
private
String
getTargetDBname
(
DmpSyncingDatasource
ds
)
{
private
String
getTargetDBname
(
DmpSyncingDatasource
ds
)
{
String
url
=
ds
.
getJdbcUrl
();
String
url
=
ds
.
getJdbcUrl
();
if
(
StringUtils
.
isNotBlank
(
url
))
{
if
(
StringUtils
.
isNotBlank
(
url
))
{
if
(
ds
.
getDatasourceType
()
==
2
)
{
//sqlserver
if
(
ds
.
getDatasourceType
()
==
2
)
{
//sqlserver
String
[]
arr1
=
url
.
split
(
"="
);
String
[]
arr1
=
url
.
split
(
"="
);
String
[]
arr2
=
arr1
[
1
].
split
(
";"
);
String
[]
arr2
=
arr1
[
1
].
split
(
";"
);
System
.
out
.
println
(
arr2
[
0
]);
System
.
out
.
println
(
arr2
[
0
]);
return
arr2
[
0
];
return
arr2
[
0
];
}
else
{
}
else
{
String
[]
arr1
=
url
.
split
(
"/"
);
String
[]
arr1
=
url
.
split
(
"/"
);
String
ls
=
arr1
[
arr1
.
length
-
1
];
String
ls
=
arr1
[
arr1
.
length
-
1
];
String
[]
arr2
=
ls
.
split
(
"\\?"
);
String
[]
arr2
=
ls
.
split
(
"\\?"
);
return
arr2
[
0
];
return
arr2
[
0
];
}
}
}
else
{
}
else
{
if
(
ds
.
getDatasourceType
().
equals
(
"11"
))
{
//ftp
if
(
ds
.
getDatasourceType
().
equals
(
"11"
))
{
//ftp
return
ds
.
getDbName
();
return
ds
.
getDbName
();
}
}
return
""
;
return
""
;
...
@@ -385,10 +387,10 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -385,10 +387,10 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
String
sT
=
s
.
getDatasourceType
().
toLowerCase
();
String
sT
=
s
.
getDatasourceType
().
toLowerCase
();
String
tT
=
t
.
getDatasourceType
().
toLowerCase
();
String
tT
=
t
.
getDatasourceType
().
toLowerCase
();
String
_ft
=
""
;
String
_ft
=
""
;
if
(
StringUtils
.
isNotBlank
(
ft
))
{
if
(
StringUtils
.
isNotBlank
(
ft
))
{
_ft
=
ft
;
_ft
=
ft
;
}
}
if
(
s
.
getDatasourceCatecode
().
equals
(
"RDBMS"
))
{
if
(
s
.
getDatasourceCatecode
().
equals
(
"RDBMS"
))
{
sT
=
"jdbc"
;
sT
=
"jdbc"
;
}
}
return
sT
+
_ft
+
"2"
+
tT
;
return
sT
+
_ft
+
"2"
+
tT
;
...
@@ -425,13 +427,13 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -425,13 +427,13 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
String
name
=
(
String
)
m
.
get
(
"name"
);
String
name
=
(
String
)
m
.
get
(
"name"
);
String
type
=
StringUtils
.
getStringValue
(
m
.
get
(
"type"
),
""
);
//relatedTypeMap.get(StringUtils.getStringValue(m.get("type"), ""));
String
type
=
StringUtils
.
getStringValue
(
m
.
get
(
"type"
),
""
);
//relatedTypeMap.get(StringUtils.getStringValue(m.get("type"), ""));
String
isPt
=
(
String
)
m
.
get
(
"isPt"
);
String
isPt
=
(
String
)
m
.
get
(
"isPt"
);
if
(
"SQLSERVER"
.
equals
(
s
.
getDatasourceType
()))
{
//对sqlserver 修改
if
(
"SQLSERVER"
.
equals
(
s
.
getDatasourceType
()))
{
//对sqlserver 修改
if
(
"STRING"
.
equals
(
type
))
{
if
(
"STRING"
.
equals
(
type
))
{
source_table
+=
"REPLACE(REPLACE(REPLACE("
+
name
+
", CHAR(13), '<br1>'),CHAR(10),'<br2>'),',','<comma>') AS "
+
name
;
source_table
+=
"REPLACE(REPLACE(REPLACE("
+
name
+
", CHAR(13), '<br1>'),CHAR(10),'<br2>'),',','<comma>') AS "
+
name
;
}
else
{
}
else
{
source_table
+=
name
;
source_table
+=
name
;
}
}
}
else
{
}
else
{
// source_table += "replace(cast(" + name + " as char), char(10), '') as " + name;
// source_table += "replace(cast(" + name + " as char), char(10), '') as " + name;
source_table
+=
name
;
source_table
+=
name
;
}
}
...
@@ -445,11 +447,11 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -445,11 +447,11 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
}
}
}
else
{
}
else
{
source_columns
+=
name
;
source_columns
+=
name
;
if
(
"varchar"
.
equalsIgnoreCase
(
type
))
{
if
(
"varchar"
.
equalsIgnoreCase
(
type
))
{
source_sql
+=
"cast("
+
name
+
" as string) as "
;
source_sql
+=
"cast("
+
name
+
" as string) as "
;
}
else
if
(
"char"
.
equalsIgnoreCase
(
type
))
{
}
else
if
(
"char"
.
equalsIgnoreCase
(
type
))
{
source_sql
+=
"cast("
+
name
+
" as string) as "
;
source_sql
+=
"cast("
+
name
+
" as string) as "
;
}
else
if
(
"datetime"
.
equalsIgnoreCase
(
type
))
{
}
else
if
(
"datetime"
.
equalsIgnoreCase
(
type
))
{
source_sql
+=
"cast("
+
name
+
" as timestamp) as "
;
source_sql
+=
"cast("
+
name
+
" as timestamp) as "
;
}
else
{
}
else
{
source_sql
+=
"cast("
+
name
+
" as "
+
type
+
") as "
;
source_sql
+=
"cast("
+
name
+
" as "
+
type
+
") as "
;
...
@@ -524,13 +526,13 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -524,13 +526,13 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
}
}
private
String
getSourceHdfsDir
(
String
filePath
,
String
fileName
)
{
private
String
getSourceHdfsDir
(
String
filePath
,
String
fileName
)
{
if
(
StringUtils
.
isNotBlank
(
fileName
))
{
if
(
StringUtils
.
isNotBlank
(
fileName
))
{
boolean
f1
=
filePath
.
endsWith
(
"/"
);
boolean
f1
=
filePath
.
endsWith
(
"/"
);
boolean
f2
=
fileName
.
startsWith
(
"/"
);
boolean
f2
=
fileName
.
startsWith
(
"/"
);
if
(
f1
&&
f2
)
{
if
(
f1
&&
f2
)
{
fileName
=
fileName
.
substring
(
1
,
fileName
.
length
());
fileName
=
fileName
.
substring
(
1
,
fileName
.
length
());
}
}
if
(!
f1
&&
!
f2
)
{
if
(!
f1
&&
!
f2
)
{
filePath
+=
"/"
;
filePath
+=
"/"
;
}
}
return
filePath
+
fileName
;
return
filePath
+
fileName
;
...
@@ -582,7 +584,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -582,7 +584,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
target_partition_columns
+=
name
+
", "
;
target_partition_columns
+=
name
+
", "
;
target_partition_columns_hive
+=
name
+
" "
+
type
+
", "
;
target_partition_columns_hive
+=
name
+
" "
+
type
+
", "
;
}
}
String
[]
rls
=
this
.
appendRules
(
m
.
get
(
"rules"
),
name
,
clean_rules
,
check_rules
);
String
[]
rls
=
this
.
appendRules
(
m
.
get
(
"rules"
),
name
,
clean_rules
,
check_rules
);
//便利清洗规则
clean_rules
=
rls
[
0
];
clean_rules
=
rls
[
0
];
check_rules
=
rls
[
1
];
check_rules
=
rls
[
1
];
if
(
"HIVE"
.
equals
(
t
.
getDatasourceType
())
&&
"1"
.
equals
(
isPt
))
{
if
(
"HIVE"
.
equals
(
t
.
getDatasourceType
())
&&
"1"
.
equals
(
isPt
))
{
...
@@ -668,29 +670,31 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
...
@@ -668,29 +670,31 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
private
String
[]
appendRules
(
Object
obj
,
String
columnName
,
String
clean_rules
,
String
check_rules
)
{
private
String
[]
appendRules
(
Object
obj
,
String
columnName
,
String
clean_rules
,
String
check_rules
)
{
List
<
Map
<
String
,
Object
>>
rules
=
(
List
<
Map
<
String
,
Object
>>)
obj
;
List
<
Map
<
String
,
Object
>>
rules
=
(
List
<
Map
<
String
,
Object
>>)
obj
;
for
(
Map
<
String
,
Object
>
m
:
rules
)
{
if
(
rules
.
size
()
>
0
&&
rules
!=
null
)
{
for
(
Map
<
String
,
Object
>
m
:
rules
)
{
String
method
=
(
String
)
m
.
get
(
"method"
);
// 规则方法
String
method
=
(
String
)
m
.
get
(
"method"
);
// 规则方法
String
type
=
(
String
)
m
.
get
(
"type"
);
// 1:清洗规则;0:校验规则
String
type
=
(
String
)
m
.
get
(
"type"
);
// 1:清洗规则;0:校验规则
List
<
String
>
params
=
(
ArrayList
<
String
>)
m
.
get
(
"params"
);
// 规则参数列表,字符串
List
<
String
>
params
=
(
ArrayList
<
String
>)
m
.
get
(
"params"
);
// 规则参数列表,字符串
String
sub_rules
=
"\t<column_name name=\""
+
columnName
+
"\">\r\n"
;
String
sub_rules
=
"\t<column_name name=\""
+
columnName
+
"\">\r\n"
;
sub_rules
+=
"\t\t<rules_type method=\""
+
method
+
"\">\r\n"
;
sub_rules
+=
"\t\t<rules_type method=\""
+
method
+
"\">\r\n"
;
if
(
params
!=
null
&&
params
.
size
()
>
0
)
if
(
params
!=
null
&&
params
.
size
()
>
0
)
for
(
String
rv
:
params
)
{
for
(
String
rv
:
params
)
{
sub_rules
+=
"\t\t\t<rules_value>"
+
rv
+
"</rules_value>\r\n"
;
sub_rules
+=
"\t\t\t<rules_value>"
+
rv
+
"</rules_value>\r\n"
;
}
}
sub_rules
+=
"\t\t</rules_type>\r\n"
;
sub_rules
+=
"\t\t</rules_type>\r\n"
;
sub_rules
+=
"\t</column_name>\r\n"
;
sub_rules
+=
"\t</column_name>\r\n"
;
if
(
"0"
.
equals
(
type
))
{
if
(
"0"
.
equals
(
type
))
{
if
(
StringUtils
.
isBlank
(
check_rules
))
if
(
StringUtils
.
isBlank
(
check_rules
))
sub_rules
=
"\r\n"
+
sub_rules
;
sub_rules
=
"\r\n"
+
sub_rules
;
check_rules
+=
sub_rules
;
check_rules
+=
sub_rules
;
}
}
if
(
"1"
.
equals
(
type
))
{
if
(
"1"
.
equals
(
type
))
{
if
(
StringUtils
.
isBlank
(
clean_rules
))
if
(
StringUtils
.
isBlank
(
clean_rules
))
sub_rules
=
"\r\n"
+
sub_rules
;
sub_rules
=
"\r\n"
+
sub_rules
;
clean_rules
+=
sub_rules
;
clean_rules
+=
sub_rules
;
}
}
}
}
return
new
String
[]
{
clean_rules
,
check_rules
};
}
return
new
String
[]{
clean_rules
,
check_rules
};
}
}
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
f353dd2b
...
@@ -452,10 +452,10 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -452,10 +452,10 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
* @return
* @return
*/
*/
public
JsonResult
addSyncing
(
Map
<
String
,
Object
>
body
)
throws
Exception
{
public
JsonResult
addSyncing
(
Map
<
String
,
Object
>
body
)
throws
Exception
{
Integer
projectId
=
(
Integer
)
body
.
get
(
"projectId"
)
;
Integer
projectId
=
Integer
.
valueOf
(
body
.
get
(
"projectId"
).
toString
())
;
Integer
parentId
=
(
Integer
)
body
.
get
(
"parentId"
);
//父节点ID
Integer
parentId
=
Integer
.
valueOf
(
body
.
get
(
"parentId"
).
toString
()
);
//父节点ID
String
taskName
=
(
String
)
body
.
get
(
"taskName"
);
//任务名称 业务节点名称 一对一
String
taskName
=
(
String
)
body
.
get
(
"taskName"
);
//任务名称 业务节点名称 一对一
Integer
treeId
=
(
Integer
)
body
.
get
(
"treeId"
);
//树节点ID
Integer
treeId
=
Integer
.
valueOf
(
body
.
get
(
"treeId"
).
toString
()
);
//树节点ID
if
(
StringUtils
.
isBlank
(
taskName
))
{
if
(
StringUtils
.
isBlank
(
taskName
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
...
@@ -502,6 +502,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -502,6 +502,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task
.
setTaskDesc
(
"Syncing Task"
);
//任务描述
task
.
setTaskDesc
(
"Syncing Task"
);
//任务描述
task
.
setIsSubmit
(
"0"
);
//是否已提交
task
.
setIsSubmit
(
"0"
);
//是否已提交
task
.
setTreeId
(
treeId
);
task
.
setTreeId
(
treeId
);
task
.
setDataStatus
(
"1"
);
String
script
=
JsonMapper
.
toJsonString
(
body
);
String
script
=
JsonMapper
.
toJsonString
(
body
);
byte
[]
data
=
null
;
byte
[]
data
=
null
;
try
{
try
{
...
@@ -561,7 +562,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
...
@@ -561,7 +562,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
private
void
settRuleInfo
(
String
taskId
,
List
<
Map
>
rules
,
List
<
DvTaskRuleT
>
list
)
throws
Exception
{
private
void
settRuleInfo
(
String
taskId
,
List
<
Map
>
rules
,
List
<
DvTaskRuleT
>
list
)
throws
Exception
{
for
(
Map
rule
:
rules
)
{
for
(
Map
rule
:
rules
)
{
DvTaskRuleT
taskRuleT
=
new
DvTaskRuleT
();
DvTaskRuleT
taskRuleT
=
new
DvTaskRuleT
();
Integer
ruleId
=
(
Integer
)
rule
.
get
(
"ruleId"
);
Integer
ruleId
=
Integer
.
valueOf
(
rule
.
get
(
"ruleId"
).
toString
()
);
String
ruleValue
=
(
String
)
rule
.
get
(
"ruleValue"
);
String
ruleValue
=
(
String
)
rule
.
get
(
"ruleValue"
);
taskRuleT
.
setTaskId
(
taskId
);
//任务ID
taskRuleT
.
setTaskId
(
taskId
);
//任务ID
taskRuleT
.
setRuleId
(
ruleId
.
longValue
());
taskRuleT
.
setRuleId
(
ruleId
.
longValue
());
...
...
src/main/resources/lxTaskJson.json
View file @
f353dd2b
//
{
//
"version"
:
"1.0"
,
//
"parentId"
:
""
,
//
"mode"
:
"0"
,
//
"projectId"
:
""
,
//
"name"
:
""
,
//
"scripts"
:
{
//
"setting"
:
{
//
"extract"
:
"incremental"
,
//增量或全量
//
"extractExpression"
:
"where 1=1"
,
//增量表达式
//
"targetBucketCounts"
:
""
,
//
"errorLimitRecord"
:
"0"
,
//
"executorMemory"
:
""
,
//
"executorCores"
:
""
,
//
"totalExecutorCores"
:
""
//
},
//
"reader"
:
{
//
"dbConnection"
:
""
,
//
"fileType"
:
""
,
//
"sourceHdfsPath"
:
""
,
//
"sourceHdfsFile"
:
""
,
//
"sourceFtpDir"
:
""
,
//
"sourceFtpFile"
:
""
,
//
"sourceSkipFtpFile"
:
""
,
//
"sourceCsvDelimiter"
:
""
,
//
"sourceCsvHeader"
:
""
,
//
"sourceCsvCharset"
:
""
,
//
"sourceCsvQuote"
:
""
,
//
"sourceFtpLoadDate"
:
""
,
//
"registerTableName"
:
""
,
//
"dayByDay"
:
"false"
,
//
"column"
:
[
//
{
//
"name"
:
""
,
//
"type"
:
""
//
}
//
]
//
},
//
"writer"
:
{
//
"targetDbConnection"
:
""
,
//
"targetTable"
:
""
,
//
"targetFtpDir"
:
""
,
//
"targetFtpFile"
:
""
,
//
"targetCsvDelimiter"
:
""
,
//
"targetCsvHeader"
:
""
,
//
"targetCsvCharset"
:
""
,
//
"targetInsertMergeOverwrite"
:
""
,
//
"column"
:
[
//
{
//
"name"
:
""
,
//
"type"
:
""
,
//
"isPk"
:
""
,
//
"isPt"
:
""
,
//
"rules"
:
[]
//
}
//
]
//
}
//
},
//
"treeId"
:
0
,
//
"taskRules"
:
[
//
{
//
"ruleId"
:
""
,
//
"ruleValue"
:
{
//
"dv_fields"
:
[
//
{
//
"fieldName"
:
""
//
}
//
],
//
"dvTime"
:
{
//
"timeField"
:
""
,
//
"timeValue"
:
{
//
"startTime"
:
""
,
//
"endTime"
:
""
//
}
//
}
//
}
//
}
//
]
//
}
{
{
"version"
:
"1.0"
,
"params"
:{
"parentId"
:
""
,
"version"
:
"1.0"
,
"mode"
:
"0"
,
"parentId"
:
"509"
,
"projectId"
:
""
,
"mode"
:
"0"
,
"name"
:
""
,
"projectId"
:
"31"
,
"scripts"
:
{
"taskName"
:
"dmp_demo_dmp_azkaban_exector_server_config"
,
"setting"
:
{
"scripts"
:{
"extract"
:
"incremental"
,
//增量或全量
"setting"
:{
"extractExpression"
:
"where 1=1"
,
//增量表达式
"extract"
:
"incremental"
,
"targetBucketCounts"
:
""
,
"extractExpression"
:
"where 1=1"
,
"errorLimitRecord"
:
"0"
,
"targetInsertMergeOverwrite"
:
"insert"
,
"executorMemory"
:
""
,
"ftColumn"
:
""
,
"executorCores"
:
""
,
"ftCount"
:
"20"
,
"totalExecutorCores"
:
""
"separateMax"
:
""
,
"separateMin"
:
""
,
"primaryKey"
:
"on"
,
"partition"
:
"on"
},
},
"reader"
:
{
"reader"
:
{
"dbConnection"
:
"
"
,
"dbConnection"
:
"mysql_dmp_demo_test
"
,
"fileType"
:
""
,
"fileType"
:
""
,
"sourceHdfsPath"
:
""
,
"sourceHdfsPath"
:
""
,
"sourceHdfsFile"
:
""
,
"sourceHdfsFile"
:
""
,
"sourceFtpDir"
:
""
,
"sourceFtpDir"
:
""
,
"sourceFtpFile"
:
""
,
"sourceFtpFile"
:
""
,
"sourceSkipFtpFile"
:
""
,
"sourceSkipFtpFile"
:
""
,
"sourceCsvDelimiter"
:
""
,
"sourceCsvDelimiter"
:
""
,
"sourceCsvHeader"
:
""
,
"sourceCsvHeader"
:
""
,
"sourceCsvCharset"
:
""
,
"sourceCsvCharset"
:
""
,
"sourceCsvQuote"
:
""
,
"sourceCsvQuote"
:
""
,
"sourceFtpLoadDate"
:
""
,
"sourceFtpLoadDate"
:
""
,
"registerTableName"
:
"
"
,
"registerTableName"
:
"dmp_azkaban_exector_server_config
"
,
"dayByDay"
:
"false"
,
"dayByDay"
:
"false"
,
"column"
:
[
"column"
:
[
{
{
"name"
:
""
,
"name"
:
"host"
,
"type"
:
""
"type"
:
"VARCHAR"
},
{
"name"
:
"port"
,
"type"
:
"VARCHAR"
},
{
"name"
:
"user_name"
,
"type"
:
"VARCHAR"
},
{
"name"
:
"pass_word"
,
"type"
:
"VARCHAR"
}
}
]
]
},
},
"writer"
:
{
"writer"
:
{
"targetDbConnection"
:
"
"
,
"targetDbConnection"
:
"mysql_dmp_demo
"
,
"targetTable"
:
"
"
,
"targetTable"
:
"dmp_azkaban_exector_server_config
"
,
"targetFtpDir"
:
""
,
"targetFtpDir"
:
""
,
"targetFtpFile"
:
""
,
"targetFtpFile"
:
""
,
"targetCsvDelimiter"
:
""
,
"targetCsvDelimiter"
:
""
,
"targetCsvHeader"
:
""
,
"targetCsvHeader"
:
""
,
"targetCsvCharset"
:
""
,
"targetCsvCharset"
:
""
,
"targetInsertMergeOverwrite"
:
"
"
,
"targetInsertMergeOverwrite"
:
"insert
"
,
"column"
:
[
"column"
:
[
{
{
"name"
:
""
,
"name"
:
"host"
,
"type"
:
""
,
"type"
:
"VARCHAR"
,
"isPk"
:
""
,
"isPk"
:
"1"
,
"isPt"
:
""
,
"isPt"
:
"0"
,
"rules"
:
[]
"rules"
:[
{
"method"
:
""
,
"type"
:
""
}
}
]
]
},
{
"name"
:
"port"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"0"
,
"isPt"
:
"1"
,
"rules"
:[
{
"method"
:
""
,
"type"
:
""
}
}
]
},
},
"treeId"
:
0
,
"taskRules"
:
[
{
{
"ruleId"
:
""
,
"name"
:
"user_name"
,
"ruleValue"
:
{
"type"
:
"VARCHAR"
,
"dv_fields"
:
[
"isPk"
:
"0"
,
"isPt"
:
"0"
,
"rules"
:[
{
"method"
:
""
,
"type"
:
""
}
]
},
{
"name"
:
"pass_word"
,
"type"
:
"VARCHAR"
,
"isPk"
:
"0"
,
"isPt"
:
"0"
,
"rules"
:[
{
"method"
:
""
,
"type"
:
""
}
]
}
]
}
},
"treeId"
:
669
,
"taskRules"
:[
{
{
"fieldName"
:
""
"ruleId"
:
""
,
"ruleValue"
:{
"dv_fields"
:[
{
"fieldName"
:
""
}
}
],
],
"dvTime"
:
{
"dvTime"
:
{
"timeField"
:
""
,
"timeField"
:
""
,
"timeValue"
:
{
"timeValue"
:
{
"startTime"
:
""
,
"startTime"
:
""
,
"endTime"
:
""
"endTime"
:
""
}
}
}
}
}
}
}
}
]
]
}
}
}
\ No newline at end of file
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
View file @
f353dd2b
...
@@ -21,7 +21,7 @@
...
@@ -21,7 +21,7 @@
delete from dmp_navigation_tree where id = #{treeId}
delete from dmp_navigation_tree where id = #{treeId}
</delete>
</delete>
<select
id=
"getDb
Type
"
resultType=
"java.lang.Integer"
>
<select
id=
"getDb
InfoByParam
"
resultType=
"java.lang.Integer"
>
SELECT
SELECT
dsd.id AS id
dsd.id AS id
FROM dmp_syncing_datasource dsd
FROM dmp_syncing_datasource dsd
...
@@ -108,7 +108,7 @@
...
@@ -108,7 +108,7 @@
where ID = #{id}
where ID = #{id}
</update>
</update>
<select
id=
"selectTaskInfoByParam"
parameterType=
"map"
>
<select
id=
"selectTaskInfoByParam"
parameterType=
"map"
resultType=
"com.jz.dmp.modules.model.DmpDevelopTask"
>
select
select
ID, datasource_id, TASK_TYPE, TYPE, SCHEDULE_TYPE, IS_SUBMIT, TASK_DESC, SCRIPT, DATA_STATUS
ID, datasource_id, TASK_TYPE, TYPE, SCHEDULE_TYPE, IS_SUBMIT, TASK_DESC, SCRIPT, DATA_STATUS
, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TREE_ID, CHK_RESULT, SYNC_RESULT, CHK_TIME, SYNC_TIME, FLOW_HEADER, FLOW_JSON, VERSION, IS_GZIPED
, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TREE_ID, CHK_RESULT, SYNC_RESULT, CHK_TIME, SYNC_TIME, FLOW_HEADER, FLOW_JSON, VERSION, IS_GZIPED
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment