Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-cmdexectool
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-cmdexectool
Commits
31b9fcab
Commit
31b9fcab
authored
Mar 11, 2021
by
sml
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
代码提交(主键冲突)
parent
f3ac310c
Changes
4
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
129 additions
and
3 deletions
+129
-3
SqlParameters.java
.../cmdexectool/scheduler/common/task/sql/SqlParameters.java
+14
-2
ParameterUtils.java
...mp/cmdexectool/scheduler/common/utils/ParameterUtils.java
+111
-0
AbstractCommandExecutor.java
...scheduler/server/worker/task/AbstractCommandExecutor.java
+1
-1
sink_jdbc.ftl
src/main/resources/templates/sink_jdbc.ftl
+3
-0
No files found.
src/main/java/com/jz/dmp/cmdexectool/scheduler/common/task/sql/SqlParameters.java
View file @
31b9fcab
...
...
@@ -25,7 +25,6 @@ import org.apache.commons.lang3.StringUtils;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean
;
import
org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer
;
import
com.alibaba.fastjson.JSONObject
;
import
com.jz.dmp.cmdexectool.common.constant.CommConstant
;
import
com.jz.dmp.cmdexectool.common.utils.EncryptionUtils
;
...
...
@@ -319,17 +318,30 @@ public class SqlParameters extends AbstractParameters {
||
this
.
targetBaseDataSource
.
getMyDbType
()
==
MyDbType
.
DB2
||
this
.
targetBaseDataSource
.
getMyDbType
()
==
MyDbType
.
INFORMIX
)
{
String
saveMode
=
"append"
;
String
targetTableName
=
tableObj
.
getString
(
"targetTableName"
);
String
primaryKeyConflict
=
tableObj
.
getString
(
"primaryKeyConflict"
);
if
(
CommConstant
.
PRIMARY_KEY_CONFLICT_REPLACE
.
equals
(
primaryKeyConflict
)
||
CommConstant
.
PRIMARY_KEY_CONFLICT_UPDATE
.
equals
(
primaryKeyConflict
))
{
saveMode
=
"update"
;
}
Map
<
String
,
String
>
sinkJdbcModel
=
new
HashMap
<
String
,
String
>();
sinkJdbcModel
.
put
(
"source_table_name"
,
"t"
);
sinkJdbcModel
.
put
(
"save_mode"
,
"overwrite"
);
sinkJdbcModel
.
put
(
"save_mode"
,
saveMode
);
sinkJdbcModel
.
put
(
"truncate"
,
"true"
);
sinkJdbcModel
.
put
(
"url"
,
jdbcUrl
);
sinkJdbcModel
.
put
(
"driver"
,
targetSource
.
getDriverClassName
());
sinkJdbcModel
.
put
(
"user"
,
user
);
sinkJdbcModel
.
put
(
"password"
,
password
);
sinkJdbcModel
.
put
(
"dbtable"
,
targetTableName
);
if
(
"update"
.
equals
(
saveMode
))
{
String
customUpdateStmt
=
ParameterUtils
.
columnMappingHandlerConflict
(
tableFieldsObj
,
targetTableName
,
this
.
targetBaseDataSource
.
getMyDbType
());
if
(
StringUtils
.
isNotEmpty
(
customUpdateStmt
))
{
sinkJdbcModel
.
put
(
"customUpdateStmt"
,
customUpdateStmt
);
}
}
sink
=
FreeMarkerUtils
.
freemakerNoneWebJson
(
CommConstant
.
WATERDROP_FTL_SINK_JDBC
,
sinkJdbcModel
,
freeMarkerConfigurationFactoryBean
);
}
}
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/common/utils/ParameterUtils.java
View file @
31b9fcab
...
...
@@ -33,9 +33,11 @@ import org.slf4j.LoggerFactory;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.jz.dmp.cmdexectool.common.constant.CommConstant
;
import
com.jz.dmp.cmdexectool.scheduler.common.Constants
;
import
com.jz.dmp.cmdexectool.scheduler.common.enums.CommandType
;
import
com.jz.dmp.cmdexectool.scheduler.common.enums.DataType
;
import
com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType
;
import
com.jz.dmp.cmdexectool.scheduler.common.process.Property
;
import
com.jz.dmp.cmdexectool.scheduler.common.utils.placeholder.BusinessTimeUtils
;
import
com.jz.dmp.cmdexectool.scheduler.common.utils.placeholder.PlaceholderUtils
;
...
...
@@ -307,4 +309,113 @@ public class ParameterUtils {
return
sb
.
toString
();
}
/**
* @Title: columnMappingHandler
* @Description: TODO(开发任务SQL:生成主键冲突语句)
* @param @param jsonStr
* @param @return 参数
* @return List<Map<String,String>> 返回类型
* @throws
*/
public
static
String
columnMappingHandlerConflict
(
String
jsonStr
,
String
tableName
,
MyDbType
myDbType
)
{
JSONObject
jsonObject
=
JSONObject
.
parseObject
(
jsonStr
);
JSONArray
sourceArray
=
jsonObject
.
getJSONArray
(
"sourceFields"
);
Map
<
String
,
JSONObject
>
sourceMap
=
new
HashMap
<
String
,
JSONObject
>();
for
(
int
index
=
0
;
index
<
sourceArray
.
size
();
index
++)
{
JSONObject
sourceObj
=
sourceArray
.
getJSONObject
(
index
);
sourceMap
.
put
(
sourceObj
.
getString
(
"customSoruceFieldId"
),
sourceObj
);
}
JSONArray
targetArray
=
jsonObject
.
getJSONArray
(
"targetFields"
);
Map
<
String
,
JSONObject
>
targetMap
=
new
HashMap
<
String
,
JSONObject
>();
for
(
int
index
=
0
;
index
<
targetArray
.
size
();
index
++)
{
JSONObject
targetObj
=
targetArray
.
getJSONObject
(
index
);
targetMap
.
put
(
targetObj
.
getString
(
"customTargetFieldId"
),
targetObj
);
}
StringBuilder
valuesSqlSb
=
new
StringBuilder
();
StringBuilder
primaryKeySqlSb
=
new
StringBuilder
();
StringBuilder
updateSqlSb
=
new
StringBuilder
();
JSONArray
mappingArray
=
jsonObject
.
getJSONArray
(
"columnMapping"
);
Integer
size
=
mappingArray
.
size
();
for
(
int
index
=
0
;
index
<
size
;
index
++)
{
JSONObject
mappingObj
=
mappingArray
.
getJSONObject
(
index
);
JSONObject
sourceObj
=
sourceMap
.
get
(
mappingObj
.
getString
(
"customSoruceFieldId"
));
JSONObject
targetObj
=
targetMap
.
get
(
mappingObj
.
getString
(
"customTargetFieldId"
));
//String customSoruceFiledId = sourceObj.getString("customSoruceFiledId");
String
sourceFieldName
=
sourceObj
.
getString
(
"sourceFieldName"
);
//String sourceFieldType = sourceObj.getString("sourceFieldType");
//String customTargetFieldId = targetObj.getString("customTargetFieldId");
String
targetFieldName
=
targetObj
.
getString
(
"targetFieldName"
);
//String targetFieldType = targetObj.getString("targetFieldType");
String
isPrimaryKey
=
targetObj
.
getString
(
"isPrimaryKey"
);
//设置values语句
valuesSqlSb
.
append
(
"?"
);
if
(
index
!=
size
-
1
)
{
valuesSqlSb
.
append
(
","
);
}
if
(
"0"
.
equals
(
isPrimaryKey
)
&&
myDbType
.
MySQL
==
myDbType
)
{
//设置update语句
updateSqlSb
.
append
(
targetFieldName
);
updateSqlSb
.
append
(
"=VALUES("
);
updateSqlSb
.
append
(
targetFieldName
);
updateSqlSb
.
append
(
")"
);
updateSqlSb
.
append
(
","
);
}
else
if
(
myDbType
.
PostgreSQL
==
myDbType
)
{
if
(
"1"
.
equals
(
isPrimaryKey
))
{
primaryKeySqlSb
.
append
(
"targetFieldName"
);
primaryKeySqlSb
.
append
(
","
);
}
else
{
//设置update语句
updateSqlSb
.
append
(
targetFieldName
);
updateSqlSb
.
append
(
"=EXCLUDED."
);
updateSqlSb
.
append
(
targetFieldName
);
updateSqlSb
.
append
(
","
);
}
}
}
String
primaryKeySqlStr
=
""
;
if
(
org
.
apache
.
commons
.
lang3
.
StringUtils
.
isNotEmpty
(
primaryKeySqlSb
))
{
primaryKeySqlStr
=
primaryKeySqlSb
.
substring
(
0
,
primaryKeySqlSb
.
length
()-
1
);
}
String
updateSqlStr
=
""
;
if
(
org
.
apache
.
commons
.
lang3
.
StringUtils
.
isNotEmpty
(
updateSqlSb
))
{
updateSqlStr
=
updateSqlSb
.
substring
(
0
,
updateSqlSb
.
length
()-
1
);
}
StringBuilder
sb
=
new
StringBuilder
();
if
(
myDbType
.
MySQL
==
myDbType
)
{
sb
.
append
(
"INSERT INTO "
);
sb
.
append
(
tableName
+
" "
);
sb
.
append
(
"VALUES("
);
sb
.
append
(
valuesSqlSb
);
sb
.
append
(
") "
);
sb
.
append
(
"ON DUPLICATE KEY UPDATE "
);
sb
.
append
(
updateSqlStr
);
}
else
if
(
myDbType
.
PostgreSQL
==
myDbType
)
{
sb
.
append
(
"INSERT INTO "
);
sb
.
append
(
tableName
+
" "
);
sb
.
append
(
"VALUES("
);
sb
.
append
(
valuesSqlSb
);
sb
.
append
(
") "
);
sb
.
append
(
"ON CONFLICT("
);
sb
.
append
(
primaryKeySqlStr
);
sb
.
append
(
") do update "
);
sb
.
append
(
updateSqlStr
);
}
return
sb
.
toString
();
}
}
src/main/java/com/jz/dmp/cmdexectool/scheduler/server/worker/task/AbstractCommandExecutor.java
View file @
31b9fcab
...
...
@@ -204,7 +204,7 @@ public abstract class AbstractCommandExecutor {
// if timeout occurs, exit directly
//long remainTime = getRemaintime();
long
remainTime
=
6
0
;
long
remainTime
=
30
0
;
// waiting for the run to finish
boolean
status
=
process
.
waitFor
(
remainTime
,
TimeUnit
.
SECONDS
);
...
...
src/main/resources/templates/sink_jdbc.ftl
View file @
31b9fcab
...
...
@@ -23,4 +23,7 @@ Jdbc {
<#if dbtable??>
dbtable = "${dbtable!}"
</#if>
<#if customUpdateStmt??>
customUpdateStmt = "${customUpdateStmt!}"
</#if>
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment