Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-cmdexectool
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-cmdexectool
Commits
273a5b89
Commit
273a5b89
authored
Mar 10, 2021
by
mcb
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' of
http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-cmdexectool
parents
39b88358
02386609
Changes
12
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
429 additions
and
70 deletions
+429
-70
pom.xml
pom.xml
+6
-0
RemoteExecuteCmdUtils.java
...z/dmp/cmdexectool/common/utils/RemoteExecuteCmdUtils.java
+50
-0
RemoteExecuteCommand.java
...jz/dmp/cmdexectool/common/utils/RemoteExecuteCommand.java
+254
-0
MyDbType.java
...m/jz/dmp/cmdexectool/scheduler/common/enums/MyDbType.java
+3
-0
SqlParameters.java
.../cmdexectool/scheduler/common/task/sql/SqlParameters.java
+50
-37
ParameterUtils.java
...mp/cmdexectool/scheduler/common/utils/ParameterUtils.java
+4
-4
BaseDataSource.java
.../cmdexectool/scheduler/dao/datasource/BaseDataSource.java
+5
-1
DatabaseUtils.java
...jz/dmp/cmdexectool/scheduler/dao/utils/DatabaseUtils.java
+1
-0
WaterdropCommandExecutor.java
...cheduler/server/worker/task/WaterdropCommandExecutor.java
+5
-2
SqlTask.java
...cmdexectool/scheduler/server/worker/task/sql/SqlTask.java
+46
-24
DmpPublicConfigInfoMapper.xml
src/main/resources/mapperconf/DmpPublicConfigInfoMapper.xml
+1
-1
sink_console.ftl
src/main/resources/templates/sink_console.ftl
+4
-1
No files found.
pom.xml
View file @
273a5b89
...
...
@@ -572,6 +572,12 @@
<version>
1.1.12
</version>
</dependency>
<dependency>
<groupId>
org.jvnet.hudson
</groupId>
<artifactId>
ganymed-ssh2
</artifactId>
<version>
build210-hudson-1
</version>
</dependency>
</dependencies>
<build>
<finalName>
jz-dmp-cmdexectool
</finalName>
...
...
src/main/java/com/jz/dmp/cmdexectool/common/utils/RemoteExecuteCmdUtils.java
0 → 100644
View file @
273a5b89
package
com
.
jz
.
dmp
.
cmdexectool
.
common
.
utils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
RemoteExecuteCmdUtils
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
RemoteExecuteCmdUtils
.
class
);
/**
* @Title: sshCmd
* @Description: TODO(ssh执行command)
* @param @param remoteServer
* @param @param remoteUser
* @param @param remotePassword
* @param @param cmd
* @param @return 参数
* @return boolean 返回类型
* @throws
*/
public
static
boolean
sshCmd
(
String
remoteServer
,
String
remoteUser
,
String
remotePassword
,
String
cmd
)
{
logger
.
info
(
"############################## SHELL: sshCmd starting ##############################"
);
logger
.
info
(
"remoteServer: "
+
remoteServer
);
logger
.
info
(
"remoteUser: "
+
remoteUser
);
logger
.
info
(
"remotePassword: "
+
remotePassword
);
RemoteExecuteCommand
rec
=
new
RemoteExecuteCommand
(
remoteServer
,
remoteUser
,
remotePassword
);
logger
.
info
(
"cmd【{}】"
,
cmd
);
String
exe_return
=
rec
.
execute
(
cmd
);
logger
.
info
(
"exe_return: "
+
exe_return
);
logger
.
info
(
"############################## SHELL: sshCmd end ##############################"
);
return
isExecuteSuccess
(
exe_return
);
}
private
static
boolean
isExecuteSuccess
(
String
exe_return
)
{
if
(
StringUtils
.
isNotBlank
(
exe_return
)
&&
(
exe_return
.
startsWith
(
"0#"
)
||
(
exe_return
.
startsWith
(
"null#"
)
&&
!(
exe_return
.
indexOf
(
"Exception"
)
>
-
1
||
exe_return
.
indexOf
(
"exception"
)
>
-
1
||
exe_return
.
indexOf
(
"EXCEPTION"
)
>
-
1
)))
)
{
return
true
;
}
else
{
return
false
;
}
}
}
src/main/java/com/jz/dmp/cmdexectool/common/utils/RemoteExecuteCommand.java
0 → 100644
View file @
273a5b89
package
com
.
jz
.
dmp
.
cmdexectool
.
common
.
utils
;
import
java.io.BufferedReader
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.io.InputStreamReader
;
import
java.io.UnsupportedEncodingException
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
ch.ethz.ssh2.ChannelCondition
;
import
ch.ethz.ssh2.Connection
;
import
ch.ethz.ssh2.Session
;
import
ch.ethz.ssh2.StreamGobbler
;
/**
* 远程执行linux的shell script
* @author hht
* @since V0.1
*/
public
class
RemoteExecuteCommand
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RemoteExecuteCommand
.
class
);
//字符编码默认是utf-8
private
static
String
DEFAULTCHART
=
"UTF-8"
;
private
Connection
conn
;
private
String
ip
;
private
String
userName
;
private
String
userPwd
;
public
RemoteExecuteCommand
(
String
ip
,
String
userName
,
String
userPwd
)
{
this
.
ip
=
ip
;
this
.
userName
=
userName
;
this
.
userPwd
=
userPwd
;
}
public
RemoteExecuteCommand
()
{
}
/**
* 远程登录linux的主机
* @author Ickes
* @since V0.1
* @return
* 登录成功返回true,否则返回false
*/
public
Boolean
login
(){
boolean
flg
=
false
;
try
{
conn
=
new
Connection
(
ip
);
conn
.
connect
();
//连接
flg
=
conn
.
authenticateWithPassword
(
userName
,
userPwd
);
//认证
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
return
flg
;
}
/**
* @author Ickes
* 远程执行shll脚本或者命令
* @param cmd
* 即将执行的命令
* @return
* 命令执行完后返回的结果值:0#XXX,前两位为执行返回值:0#表示执行成功,其他表示失败
* @since V0.1
*/
public
String
execute
(
String
cmd
){
String
result
=
""
;
Session
session
=
null
;
try
{
if
(
login
()){
session
=
conn
.
openSession
();
//打开一个会话
session
.
execCommand
(
cmd
);
//执行命令
result
=
processStdout
(
session
.
getStdout
(),
DEFAULTCHART
);
//如果为得到标准输出为空,说明脚本执行出错了
if
(
StringUtils
.
isBlank
(
result
)){
result
=
processStdout
(
session
.
getStderr
(),
DEFAULTCHART
);
}
result
=
session
.
getExitStatus
()+
"#"
+
result
;
}
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
finally
{
if
(
conn
!=
null
)
{
conn
.
close
();
}
if
(
session
!=
null
)
{
session
.
close
();
}
}
return
result
;
}
/**
* 命令是否执行成功
* @param cmd 要执行的远程命令
* @return true:执行成功 执行命令
*/
public
boolean
isExecuteSuccess
(
String
cmd
){
logger
.
info
(
"[ip="
+
this
.
ip
+
", userName="
+
this
.
userName
+
", password="
+
this
.
userPwd
+
"]开始执行cmd:"
+
cmd
);
Session
session
=
null
;
try
{
if
(
login
()){
session
=
conn
.
openSession
();
//打开一个会话
session
.
execCommand
(
cmd
);
//执行命令
session
.
waitForCondition
(
ChannelCondition
.
EXIT_STATUS
,
0
);
//获取错误信息
Integer
exitStatus
=
session
.
getExitStatus
();
if
(
exitStatus
!=
null
&&
exitStatus
==
0
)
{
//没有错误信息且退出码是0 说明执行成功
logger
.
info
(
"cmd:"
+
cmd
+
"执行成功"
);
return
true
;
}
else
{
logger
.
info
(
cmd
+
"执行错误,返回码:"
+
exitStatus
);
return
false
;
}
}
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
finally
{
if
(
conn
!=
null
)
{
conn
.
close
();
}
if
(
session
!=
null
)
{
session
.
close
();
}
}
logger
.
info
(
"cmd:"
+
cmd
+
"执行结束"
);
return
false
;
}
/**
* 解析脚本执行返回的结果集
* @author Ickes
* @param in 输入流对象
* @param charset 编码
* @since V0.1
* @return
* 以纯文本的格式返回
*/
private
String
processStdout
(
InputStream
in
,
String
charset
){
InputStream
stdout
=
new
StreamGobbler
(
in
);
StringBuffer
buffer
=
new
StringBuffer
();
BufferedReader
br
=
null
;
try
{
br
=
new
BufferedReader
(
new
InputStreamReader
(
stdout
,
charset
));
String
line
=
null
;
while
((
line
=
br
.
readLine
())
!=
null
){
buffer
.
append
(
line
+
"\n"
);
}
}
catch
(
UnsupportedEncodingException
e
)
{
e
.
printStackTrace
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
finally
{
if
(
null
!=
br
)
{
try
{
br
.
close
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
return
buffer
.
toString
();
}
public
static
void
setCharset
(
String
charset
)
{
DEFAULTCHART
=
charset
;
}
public
Connection
getConn
()
{
return
conn
;
}
public
void
setConn
(
Connection
conn
)
{
this
.
conn
=
conn
;
}
public
String
getIp
()
{
return
ip
;
}
public
void
setIp
(
String
ip
)
{
this
.
ip
=
ip
;
}
public
String
getUserName
()
{
return
userName
;
}
public
void
setUserName
(
String
userName
)
{
this
.
userName
=
userName
;
}
public
String
getUserPwd
()
{
return
userPwd
;
}
public
void
setUserPwd
(
String
userPwd
)
{
this
.
userPwd
=
userPwd
;
}
public
static
void
main
(
String
[]
args
)
{
/*System.out.println("execute begin ");
RemoteExecuteCommand rec=new RemoteExecuteCommand("10.0.53.56", "root",
"!QAZ2wsx3edc");
String result=rec.execute("sh /root/why/hht_test.sh 11");
System.out.println(result.startsWith("0#"));
if(result.startsWith("0#")) {
System.out.println("execute ok! result="+result);
}else {
System.out.println("execute error! result="+result);
}*/
//RemoteExecuteCommand rec=new RemoteExecuteCommand("192.168.56.101", "sml", "smlhao");
//System.err.println(rec.isExecuteSuccess("/home/sml/checkAgentRun.sh 11"));
RemoteExecuteCommand
rec
=
new
RemoteExecuteCommand
(
"192.168.204.123"
,
"root"
,
"123"
);
String
s
=
rec
.
execute
(
"sh /home/harry/test/test.sh"
);
System
.
out
.
println
(
s
);
String
state
=
"UNKNOWN"
;
String
finalState
=
"UNKNOWN"
;
if
(
StringUtils
.
isNotBlank
(
s
))
{
String
enterLine
=
"\n"
;
int
a
=
1
;
if
(
s
.
indexOf
(
"\r\n"
)
>
-
1
)
{
enterLine
=
"\r\n"
;
a
=
2
;
}
else
if
(
s
.
indexOf
(
"\r"
)
>
-
1
)
{
enterLine
=
"\r"
;
}
else
if
(
s
.
indexOf
(
"\n"
)
>
-
1
)
{
enterLine
=
"\n"
;
}
boolean
flag
=
true
;
while
(
flag
)
{
int
i1
=
s
.
indexOf
(
enterLine
);
String
tmp
=
s
.
substring
(
0
,
i1
);
if
(
tmp
.
contains
(
"State"
))
{
String
[]
ss
=
tmp
.
split
(
":"
);
state
=
ss
[
1
].
trim
();
}
else
if
(
tmp
.
contains
(
"Final-State"
))
{
String
[]
ss
=
tmp
.
split
(
":"
);
finalState
=
ss
[
1
].
trim
();
break
;
}
else
{
s
=
s
.
substring
(
i1
+
a
,
s
.
length
());
}
}
System
.
err
.
println
(
s
);
}
/*String fn = "c.b.a.txt";
int idx = fn.lastIndexOf(".");
System.out.println(fn.substring(idx+1, fn.length()));*/
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/cmdexectool/scheduler/common/enums/MyDbType.java
View file @
273a5b89
...
...
@@ -52,6 +52,9 @@ public enum MyDbType {
break
;
}
}
if
(
myDbTypeMatch
==
null
)
{
throw
new
RuntimeException
(
"数据源类型没有对应的jdbc映射:"
+
idStr
);
}
return
myDbTypeMatch
;
}
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/common/task/sql/SqlParameters.java
View file @
273a5b89
...
...
@@ -21,6 +21,7 @@ import java.util.HashMap;
import
java.util.List
;
import
java.util.Map
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer
;
...
...
@@ -187,10 +188,10 @@ public class SqlParameters extends AbstractParameters {
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
JSONObject
apiObj
=
scriptObj
.
getJSONObject
(
"api"
);
String
columnFieldsObj
=
apiObj
.
getString
(
"columnFields"
);
...
...
@@ -203,6 +204,7 @@ public class SqlParameters extends AbstractParameters {
//sink
Map
<
String
,
String
>
sinkApiModel
=
new
HashMap
<
String
,
String
>();
sinkApiModel
.
put
(
"source_table_name"
,
"t"
);
sinkApiModel
.
put
(
"url"
,
apiObj
.
getString
(
"apiUrl"
));
sinkApiModel
.
put
(
"apiKey"
,
apiObj
.
getString
(
"apiKey"
));
sinkApiModel
.
put
(
"method"
,
apiObj
.
getString
(
"method"
));
...
...
@@ -227,15 +229,16 @@ public class SqlParameters extends AbstractParameters {
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink
JSONObject
topicObj
=
scriptObj
.
getJSONObject
(
"topic"
);
Map
<
String
,
String
>
kafkaModel
=
new
HashMap
<
String
,
String
>();
kafkaModel
.
put
(
"source_table_name"
,
"t"
);
kafkaModel
.
put
(
"topic"
,
topicObj
.
getString
(
"topic"
));
kafkaModel
.
put
(
"broker"
,
topicObj
.
getString
(
"server"
));
sink
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SINK_KAFKA
,
kafkaModel
,
freeMarkerConfig
);
...
...
@@ -262,9 +265,13 @@ public class SqlParameters extends AbstractParameters {
String
postImportStatement
=
tableObj
.
getString
(
"postImportStatement"
);
preStatements
=
new
ArrayList
<
String
>();
preStatements
.
add
(
preImportStatement
);
if
(
StringUtils
.
isNotEmpty
(
preImportStatement
))
{
preStatements
.
add
(
preImportStatement
);
}
posStatements
=
new
ArrayList
<
String
>();
posStatements
.
add
(
postImportStatement
);
if
(
StringUtils
.
isNotEmpty
(
postImportStatement
))
{
posStatements
.
add
(
postImportStatement
);
}
//设置目标执行前导后导语句目标数据源
Integer
targetSourceId
=
tableObj
.
getInteger
(
"targetSourceId"
);
...
...
@@ -273,7 +280,7 @@ public class SqlParameters extends AbstractParameters {
String
jdbcUrl
=
targetSource
.
getJdbcUrl
();
String
user
=
targetSource
.
getUserName
();
String
password
=
EncryptionUtils
.
decode
(
targetSource
.
getPassword
(),
publicKey
);
MyDbType
myDbType
=
MyDbType
.
obtainByIdStr
(
targetSource
.
get
Id
().
toString
());
MyDbType
myDbType
=
MyDbType
.
obtainByIdStr
(
targetSource
.
get
DatasourceType
().
toString
());
targetBaseDataSource
=
new
MyBaseDataSource
();
targetBaseDataSource
.
setJdbcUrlDirect
(
jdbcUrl
);
...
...
@@ -287,16 +294,16 @@ public class SqlParameters extends AbstractParameters {
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
String
tableFieldsObj
=
tableObj
.
getString
(
"tableFields"
);
String
sqlStr
=
ParameterUtils
.
columnMappingHandler
(
tableFieldsObj
);
Map
<
String
,
String
>
transformMappingSqlModel
=
new
HashMap
<
String
,
String
>();
transformMappingSqlModel
.
put
(
"source_table_name"
,
"t
able_view
"
);
transformMappingSqlModel
.
put
(
"source_table_name"
,
"t"
);
transformMappingSqlModel
.
put
(
"sql"
,
sqlStr
);
transform
=
transform
+
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformMappingSqlModel
,
freeMarkerConfig
);
...
...
@@ -310,14 +317,17 @@ public class SqlParameters extends AbstractParameters {
||
this
.
targetBaseDataSource
.
getMyDbType
()
==
MyDbType
.
DB2
||
this
.
targetBaseDataSource
.
getMyDbType
()
==
MyDbType
.
INFORMIX
)
{
String
targetTableName
=
tableObj
.
getString
(
"targetTableName"
);
Map
<
String
,
String
>
sinkJdbcModel
=
new
HashMap
<
String
,
String
>();
sinkJdbcModel
.
put
(
"source_table_name"
,
"t"
);
sinkJdbcModel
.
put
(
"save_mode"
,
"overwrite"
);
sinkJdbcModel
.
put
(
"truncate"
,
"true"
);
sinkJdbcModel
.
put
(
"url"
,
targetSource
.
getJdbcUrl
()
);
sinkJdbcModel
.
put
(
"url"
,
jdbcUrl
);
sinkJdbcModel
.
put
(
"driver"
,
targetSource
.
getDriverClassName
());
sinkJdbcModel
.
put
(
"user"
,
targetSource
.
getUserName
()
);
sinkJdbcModel
.
put
(
"password"
,
targetSource
.
getPassword
()
);
sinkJdbcModel
.
put
(
"dbtable"
,
target
Source
.
getDbName
()
);
sinkJdbcModel
.
put
(
"user"
,
user
);
sinkJdbcModel
.
put
(
"password"
,
password
);
sinkJdbcModel
.
put
(
"dbtable"
,
target
TableName
);
sink
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SINK_JDBC
,
sinkJdbcModel
,
freeMarkerConfig
);
}
}
...
...
@@ -338,15 +348,16 @@ public class SqlParameters extends AbstractParameters {
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink
JSONObject
hdfsObj
=
scriptObj
.
getJSONObject
(
"hdfs"
);
String
hdfsDir
=
hdfsObj
.
getString
(
"hdfsDir"
);
Map
<
String
,
String
>
hdfsModel
=
new
HashMap
<
String
,
String
>();
hdfsModel
.
put
(
"source_table_name"
,
"t"
);
hdfsModel
.
put
(
"path"
,
hdfsDir
);
sink
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SINK_HDFS
,
hdfsModel
,
freeMarkerConfig
);
}
...
...
@@ -365,12 +376,13 @@ public class SqlParameters extends AbstractParameters {
return
;
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink
Map
<
String
,
String
>
stdoutModel
=
new
HashMap
<
String
,
String
>();
stdoutModel
.
put
(
"source_table_name"
,
"t"
);
sink
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SINK_CONSOLE
,
stdoutModel
,
freeMarkerConfig
);
}
...
...
@@ -392,16 +404,17 @@ public class SqlParameters extends AbstractParameters {
String
jdbcUrl
=
dmpSyncingDatasource
.
getJdbcUrl
();
String
user
=
dmpSyncingDatasource
.
getUserName
();
String
password
=
EncryptionUtils
.
decode
(
dmpSyncingDatasource
.
getPassword
(),
publicKey
);
MyDbType
myDbType
=
MyDbType
.
obtainByIdStr
(
dmpSyncingDatasource
.
getId
().
toString
());
MyDbType
myDbType
=
MyDbType
.
obtainByIdStr
(
dmpSyncingDatasource
.
getDatasourceType
().
toString
());
sourceBaseDataSource
=
new
MyBaseDataSource
();
sourceBaseDataSource
.
setJdbcUrlDirect
(
jdbcUrl
);
sourceBaseDataSource
.
setUser
(
user
);
sourceBaseDataSource
.
setPassword
(
password
);
sourceBaseDataSource
.
setMyDbType
(
myDbType
);
// 如果执行引擎选择的事jdbc,不用生成waterdrop source
if
(
this
.
executioEngine
.
equals
(
CommConstant
.
EXECUTION_ENGINE_JDBC
))
{
sourceBaseDataSource
=
new
MyBaseDataSource
();
sourceBaseDataSource
.
setJdbcUrlDirect
(
jdbcUrl
);
sourceBaseDataSource
.
setUser
(
user
);
sourceBaseDataSource
.
setPassword
(
password
);
sourceBaseDataSource
.
setMyDbType
(
myDbType
);
return
;
}
...
...
@@ -416,8 +429,8 @@ public class SqlParameters extends AbstractParameters {
Map
<
String
,
String
>
jdbcModel
=
new
HashMap
<
String
,
String
>();
jdbcModel
.
put
(
"driver"
,
dmpSyncingDatasource
.
getDriverClassName
());
jdbcModel
.
put
(
"url"
,
jdbcUrl
);
jdbcModel
.
put
(
"table"
,
this
.
sqlScript
);
jdbcModel
.
put
(
"result_table_name"
,
"t
able_view
"
);
jdbcModel
.
put
(
"table"
,
"("
+
this
.
sqlScript
+
") AS t"
);
jdbcModel
.
put
(
"result_table_name"
,
"t"
);
jdbcModel
.
put
(
"user"
,
user
);
jdbcModel
.
put
(
"password"
,
password
);
this
.
source
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SOURCE_JDBC
,
jdbcModel
,
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/common/utils/ParameterUtils.java
View file @
273a5b89
...
...
@@ -270,7 +270,7 @@ public class ParameterUtils {
Map
<
String
,
JSONObject
>
sourceMap
=
new
HashMap
<
String
,
JSONObject
>();
for
(
int
index
=
0
;
index
<
sourceArray
.
size
();
index
++)
{
JSONObject
sourceObj
=
sourceArray
.
getJSONObject
(
index
);
sourceMap
.
put
(
sourceObj
.
getString
(
"customSoruceFi
le
dId"
),
sourceObj
);
sourceMap
.
put
(
sourceObj
.
getString
(
"customSoruceFi
el
dId"
),
sourceObj
);
}
JSONArray
targetArray
=
jsonObject
.
getJSONArray
(
"targetFields"
);
Map
<
String
,
JSONObject
>
targetMap
=
new
HashMap
<
String
,
JSONObject
>();
...
...
@@ -285,8 +285,8 @@ public class ParameterUtils {
Integer
size
=
mappingArray
.
size
();
for
(
int
index
=
0
;
index
<
size
;
index
++)
{
JSONObject
mappingObj
=
mappingArray
.
getJSONObject
(
index
);
JSONObject
sourceObj
=
sourceMap
.
get
(
mappingObj
.
getString
(
"customSoruceFi
le
dId"
));
JSONObject
targetObj
=
sourceMap
.
get
(
mappingObj
.
getString
(
"customTargetFile
dId"
));
JSONObject
sourceObj
=
sourceMap
.
get
(
mappingObj
.
getString
(
"customSoruceFi
el
dId"
));
JSONObject
targetObj
=
targetMap
.
get
(
mappingObj
.
getString
(
"customTargetFiel
dId"
));
//String customSoruceFiledId = sourceObj.getString("customSoruceFiledId");
String
sourceFieldName
=
sourceObj
.
getString
(
"sourceFieldName"
);
...
...
@@ -302,7 +302,7 @@ public class ParameterUtils {
sb
.
append
(
" "
);
}
}
sb
.
append
(
"FROM t
_view
"
);
sb
.
append
(
"FROM t"
);
return
sb
.
toString
();
}
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/dao/datasource/BaseDataSource.java
View file @
273a5b89
...
...
@@ -90,7 +90,11 @@ public abstract class BaseDataSource {
* @return getJdbcUrl
*/
public
String
getJdbcUrl
()
{
StringBuilder
jdbcUrl
=
new
StringBuilder
(
getAddress
());
String
addressStr
=
getAddress
();
if
(
addressStr
==
null
)
{
return
null
;
}
StringBuilder
jdbcUrl
=
new
StringBuilder
(
addressStr
);
appendDatabase
(
jdbcUrl
);
appendPrincipal
(
jdbcUrl
);
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/dao/utils/DatabaseUtils.java
View file @
273a5b89
...
...
@@ -288,6 +288,7 @@ public class DatabaseUtils {
* @return SqlBinds
*/
public
static
SqlBinds
getSqlAndSqlParamsMap
(
String
sql
)
{
Map
<
Integer
,
Property
>
sqlParamsMap
=
new
HashMap
<>();
StringBuilder
sqlBuilder
=
new
StringBuilder
();
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/server/worker/task/WaterdropCommandExecutor.java
View file @
273a5b89
...
...
@@ -95,7 +95,8 @@ public class WaterdropCommandExecutor extends AbstractCommandExecutor {
if
(
OSUtils
.
isWindows
())
{
//sb.append("@echo off\n");
//sb.append("cd /d %~dp0\n");
sb
.
append
(
"./bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/application.conf"
);
sb
.
append
(
"/app/waterdrop/bin/start-waterdrop-spark.sh --master yarn --deploy-mode client --config "
);
//sb.append("scp "+execCommand+" root@192.168.1.221:/opt");
if
(
taskExecutionContext
.
getEnvFile
()
!=
null
)
{
sb
.
append
(
"call "
).
append
(
taskExecutionContext
.
getEnvFile
()).
append
(
"\n"
);
}
...
...
@@ -103,13 +104,15 @@ public class WaterdropCommandExecutor extends AbstractCommandExecutor {
//sb.append("#!/bin/sh\n");
//sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
//sb.append("cd $BASEDIR\n");
sb
.
append
(
"./bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/application.conf"
);
sb
.
append
(
"/app/waterdrop/bin/start-waterdrop-spark.sh --master yarn --deploy-mode client --config "
);
//sb.append("scp "+execCommand+" root@192.168.1.221:/opt");
if
(
taskExecutionContext
.
getEnvFile
()
!=
null
)
{
sb
.
append
(
"source "
).
append
(
taskExecutionContext
.
getEnvFile
()).
append
(
"\n"
);
}
}
sb
.
append
(
execCommand
);
//sb.append("/application.conf");
logger
.
info
(
"command : {}"
,
sb
.
toString
());
// write data to file
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/server/worker/task/sql/SqlTask.java
View file @
273a5b89
...
...
@@ -36,7 +36,9 @@ import org.apache.commons.lang3.StringUtils;
import
org.slf4j.Logger
;
import
org.springframework.util.CollectionUtils
;
import
com.alibaba.fastjson.JSONObject
;
import
com.jz.dmp.cmdexectool.common.constant.CommConstant
;
import
com.jz.dmp.cmdexectool.common.utils.RemoteExecuteCmdUtils
;
import
com.jz.dmp.cmdexectool.scheduler.common.Constants
;
import
com.jz.dmp.cmdexectool.scheduler.common.enums.DbType
;
import
com.jz.dmp.cmdexectool.scheduler.common.process.Property
;
...
...
@@ -104,25 +106,22 @@ public class SqlTask extends AbstractTask {
try
{
MyBaseDataSource
targetBaseDataSource
=
sqlParameters
.
getTargetBaseDataSource
();
List
<
SqlBinds
>
preStatementSqlBinds
=
Optional
.
ofNullable
(
sqlParameters
.
getPreStatements
())
.
orElse
(
new
ArrayList
<>())
.
stream
()
.
map
(
DatabaseUtils:
:
getSqlAndSqlParamsMap
)
.
collect
(
Collectors
.
toList
());
List
<
SqlBinds
>
postStatementSqlBinds
=
Optional
.
ofNullable
(
sqlParameters
.
getPosStatements
())
.
orElse
(
new
ArrayList
<>())
.
stream
()
.
map
(
DatabaseUtils:
:
getSqlAndSqlParamsMap
)
.
collect
(
Collectors
.
toList
());
//判断是否需要运行前置sql
if
(!
CollectionUtils
.
isEmpty
(
preStatementSqlBinds
))
{
DatabaseUtils
.
executeUpdateSql
(
preStatementSqlBinds
,
targetBaseDataSource
);
//System.out.println(JSONObject.toJSONString(sqlParameters.getPreStatements()));
//System.out.println(CollectionUtils.isEmpty(sqlParameters.getPreStatements()));
if
(!
CollectionUtils
.
isEmpty
(
sqlParameters
.
getPreStatements
()))
{
List
<
SqlBinds
>
preStatementSqlBinds
=
Optional
.
ofNullable
(
sqlParameters
.
getPreStatements
())
.
orElse
(
new
ArrayList
<>())
.
stream
()
.
map
(
DatabaseUtils:
:
getSqlAndSqlParamsMap
)
.
collect
(
Collectors
.
toList
());
//判断是否需要运行前置sql
if
(!
CollectionUtils
.
isEmpty
(
preStatementSqlBinds
))
{
DatabaseUtils
.
executeUpdateSql
(
preStatementSqlBinds
,
targetBaseDataSource
);
}
}
if
(
sqlParameters
.
getExecutioEngine
().
equals
(
CommConstant
.
EXECUTION_ENGINE_JDBC
))
{
List
<
String
>
mainSqlScript
=
new
ArrayList
<
String
>();
...
...
@@ -138,14 +137,29 @@ public class SqlTask extends AbstractTask {
}
else
{
// construct process
CommandExecuteResult
commandExecuteResult
=
waterdropCommandExecutor
.
run
(
buildCommand
());
//String cmd = "/app/waterdrop/bin/start-waterdrop-spark.sh --master yarn --deploy-mode client --config /opt/application.conf";
//boolean flag = RemoteExecuteCmdUtils.sshCmd("192.168.1.221", "root", "123", cmd);
//if (!flag) {
// throw new RuntimeException("执行waterdrop失败!");
//}
setExitStatusCode
(
commandExecuteResult
.
getExitStatusCode
());
setAppIds
(
commandExecuteResult
.
getAppIds
());
setProcessId
(
commandExecuteResult
.
getProcessId
());
}
//判断是否运行后置sql
if
(!
CollectionUtils
.
isEmpty
(
postStatementSqlBinds
))
{
DatabaseUtils
.
executeUpdateSql
(
postStatementSqlBinds
,
targetBaseDataSource
);
if
(!
CollectionUtils
.
isEmpty
(
sqlParameters
.
getPosStatements
()))
{
List
<
SqlBinds
>
postStatementSqlBinds
=
Optional
.
ofNullable
(
sqlParameters
.
getPosStatements
())
.
orElse
(
new
ArrayList
<>())
.
stream
()
.
map
(
DatabaseUtils:
:
getSqlAndSqlParamsMap
)
.
collect
(
Collectors
.
toList
());
//判断是否运行后置sql
if
(!
CollectionUtils
.
isEmpty
(
postStatementSqlBinds
))
{
DatabaseUtils
.
executeUpdateSql
(
postStatementSqlBinds
,
targetBaseDataSource
);
}
}
}
catch
(
Exception
e
)
{
...
...
@@ -170,8 +184,11 @@ public class SqlTask extends AbstractTask {
*/
private
String
buildCommand
()
throws
Exception
{
// generate scripts
String
fileName
=
String
.
format
(
"%s/%s_node.%s"
,
taskExecutionContext
.
getExecutePath
(),
taskExecutionContext
.
getTaskAppId
(),
OSUtils
.
isWindows
()
?
"bat"
:
"sh"
);
//String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
// taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
String
fileName
=
String
.
format
(
"%s/%s.%s"
,
taskExecutionContext
.
getExecutePath
(),
"application"
,
"conf"
);
Path
path
=
new
File
(
fileName
).
toPath
();
...
...
@@ -206,5 +223,10 @@ public class SqlTask extends AbstractTask {
public
AbstractParameters
getParameters
()
{
return
sqlParameters
;
}
public
static
void
main
(
String
[]
args
)
{
List
<
String
>
list
=
new
ArrayList
<
String
>();
System
.
out
.
println
(
CollectionUtils
.
isEmpty
(
list
));
}
}
src/main/resources/mapperconf/DmpPublicConfigInfoMapper.xml
View file @
273a5b89
...
...
@@ -56,7 +56,7 @@
<include
refid=
"Base_Column_List"
/>
<!-- /*$BaseDtoColumnListContent$*/ -->
</sql>
<!-- 根据主键查询公共配置表 -->
<select
id=
"selectByPrimaryKey"
resultMap=
"BaseResultMap"
parameterType=
"java.lang.String"
>
select
...
...
src/main/resources/templates/sink_console.ftl
View file @
273a5b89
stdout {
console {
<#if source_table_name??>
source_table_name = ${source_table_name!}
</#if>
<#if limit??>
# 限制输出条数,范围[-1, 2147483647]
limit = ${limit!}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment