Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-cmdexectool
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-cmdexectool
Commits
02386609
Commit
02386609
authored
Mar 10, 2021
by
sml
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
代码提交
parent
4859e430
Changes
12
Show whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
429 additions
and
70 deletions
+429
-70
pom.xml
pom.xml
+6
-0
RemoteExecuteCmdUtils.java
...z/dmp/cmdexectool/common/utils/RemoteExecuteCmdUtils.java
+50
-0
RemoteExecuteCommand.java
...jz/dmp/cmdexectool/common/utils/RemoteExecuteCommand.java
+254
-0
MyDbType.java
...m/jz/dmp/cmdexectool/scheduler/common/enums/MyDbType.java
+3
-0
SqlParameters.java
.../cmdexectool/scheduler/common/task/sql/SqlParameters.java
+50
-37
ParameterUtils.java
...mp/cmdexectool/scheduler/common/utils/ParameterUtils.java
+4
-4
BaseDataSource.java
.../cmdexectool/scheduler/dao/datasource/BaseDataSource.java
+5
-1
DatabaseUtils.java
...jz/dmp/cmdexectool/scheduler/dao/utils/DatabaseUtils.java
+1
-0
WaterdropCommandExecutor.java
...cheduler/server/worker/task/WaterdropCommandExecutor.java
+5
-2
SqlTask.java
...cmdexectool/scheduler/server/worker/task/sql/SqlTask.java
+46
-24
DmpPublicConfigInfoMapper.xml
src/main/resources/mapperconf/DmpPublicConfigInfoMapper.xml
+1
-1
sink_console.ftl
src/main/resources/templates/sink_console.ftl
+4
-1
No files found.
pom.xml
View file @
02386609
...
...
@@ -572,6 +572,12 @@
<version>
1.1.12
</version>
</dependency>
<dependency>
<groupId>
org.jvnet.hudson
</groupId>
<artifactId>
ganymed-ssh2
</artifactId>
<version>
build210-hudson-1
</version>
</dependency>
</dependencies>
<build>
<finalName>
jz-dmp-cmdexectool
</finalName>
...
...
src/main/java/com/jz/dmp/cmdexectool/common/utils/RemoteExecuteCmdUtils.java
0 → 100644
View file @
02386609
package
com
.
jz
.
dmp
.
cmdexectool
.
common
.
utils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
public
class
RemoteExecuteCmdUtils
{
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
RemoteExecuteCmdUtils
.
class
);
/**
* @Title: sshCmd
* @Description: TODO(ssh执行command)
* @param @param remoteServer
* @param @param remoteUser
* @param @param remotePassword
* @param @param cmd
* @param @return 参数
* @return boolean 返回类型
* @throws
*/
public
static
boolean
sshCmd
(
String
remoteServer
,
String
remoteUser
,
String
remotePassword
,
String
cmd
)
{
logger
.
info
(
"############################## SHELL: sshCmd starting ##############################"
);
logger
.
info
(
"remoteServer: "
+
remoteServer
);
logger
.
info
(
"remoteUser: "
+
remoteUser
);
logger
.
info
(
"remotePassword: "
+
remotePassword
);
RemoteExecuteCommand
rec
=
new
RemoteExecuteCommand
(
remoteServer
,
remoteUser
,
remotePassword
);
logger
.
info
(
"cmd【{}】"
,
cmd
);
String
exe_return
=
rec
.
execute
(
cmd
);
logger
.
info
(
"exe_return: "
+
exe_return
);
logger
.
info
(
"############################## SHELL: sshCmd end ##############################"
);
return
isExecuteSuccess
(
exe_return
);
}
private
static
boolean
isExecuteSuccess
(
String
exe_return
)
{
if
(
StringUtils
.
isNotBlank
(
exe_return
)
&&
(
exe_return
.
startsWith
(
"0#"
)
||
(
exe_return
.
startsWith
(
"null#"
)
&&
!(
exe_return
.
indexOf
(
"Exception"
)
>
-
1
||
exe_return
.
indexOf
(
"exception"
)
>
-
1
||
exe_return
.
indexOf
(
"EXCEPTION"
)
>
-
1
)))
)
{
return
true
;
}
else
{
return
false
;
}
}
}
src/main/java/com/jz/dmp/cmdexectool/common/utils/RemoteExecuteCommand.java
0 → 100644
View file @
02386609
package
com
.
jz
.
dmp
.
cmdexectool
.
common
.
utils
;
import
java.io.BufferedReader
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.io.InputStreamReader
;
import
java.io.UnsupportedEncodingException
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
ch.ethz.ssh2.ChannelCondition
;
import
ch.ethz.ssh2.Connection
;
import
ch.ethz.ssh2.Session
;
import
ch.ethz.ssh2.StreamGobbler
;
/**
* 远程执行linux的shell script
* @author hht
* @since V0.1
*/
public
class
RemoteExecuteCommand
{
private
static
final
Logger
logger
=
LoggerFactory
.
getLogger
(
RemoteExecuteCommand
.
class
);
//字符编码默认是utf-8
private
static
String
DEFAULTCHART
=
"UTF-8"
;
private
Connection
conn
;
private
String
ip
;
private
String
userName
;
private
String
userPwd
;
public
RemoteExecuteCommand
(
String
ip
,
String
userName
,
String
userPwd
)
{
this
.
ip
=
ip
;
this
.
userName
=
userName
;
this
.
userPwd
=
userPwd
;
}
public
RemoteExecuteCommand
()
{
}
/**
* 远程登录linux的主机
* @author Ickes
* @since V0.1
* @return
* 登录成功返回true,否则返回false
*/
public
Boolean
login
(){
boolean
flg
=
false
;
try
{
conn
=
new
Connection
(
ip
);
conn
.
connect
();
//连接
flg
=
conn
.
authenticateWithPassword
(
userName
,
userPwd
);
//认证
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
return
flg
;
}
/**
* @author Ickes
* 远程执行shll脚本或者命令
* @param cmd
* 即将执行的命令
* @return
* 命令执行完后返回的结果值:0#XXX,前两位为执行返回值:0#表示执行成功,其他表示失败
* @since V0.1
*/
public
String
execute
(
String
cmd
){
String
result
=
""
;
Session
session
=
null
;
try
{
if
(
login
()){
session
=
conn
.
openSession
();
//打开一个会话
session
.
execCommand
(
cmd
);
//执行命令
result
=
processStdout
(
session
.
getStdout
(),
DEFAULTCHART
);
//如果为得到标准输出为空,说明脚本执行出错了
if
(
StringUtils
.
isBlank
(
result
)){
result
=
processStdout
(
session
.
getStderr
(),
DEFAULTCHART
);
}
result
=
session
.
getExitStatus
()+
"#"
+
result
;
}
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
finally
{
if
(
conn
!=
null
)
{
conn
.
close
();
}
if
(
session
!=
null
)
{
session
.
close
();
}
}
return
result
;
}
/**
* 命令是否执行成功
* @param cmd 要执行的远程命令
* @return true:执行成功 执行命令
*/
public
boolean
isExecuteSuccess
(
String
cmd
){
logger
.
info
(
"[ip="
+
this
.
ip
+
", userName="
+
this
.
userName
+
", password="
+
this
.
userPwd
+
"]开始执行cmd:"
+
cmd
);
Session
session
=
null
;
try
{
if
(
login
()){
session
=
conn
.
openSession
();
//打开一个会话
session
.
execCommand
(
cmd
);
//执行命令
session
.
waitForCondition
(
ChannelCondition
.
EXIT_STATUS
,
0
);
//获取错误信息
Integer
exitStatus
=
session
.
getExitStatus
();
if
(
exitStatus
!=
null
&&
exitStatus
==
0
)
{
//没有错误信息且退出码是0 说明执行成功
logger
.
info
(
"cmd:"
+
cmd
+
"执行成功"
);
return
true
;
}
else
{
logger
.
info
(
cmd
+
"执行错误,返回码:"
+
exitStatus
);
return
false
;
}
}
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
finally
{
if
(
conn
!=
null
)
{
conn
.
close
();
}
if
(
session
!=
null
)
{
session
.
close
();
}
}
logger
.
info
(
"cmd:"
+
cmd
+
"执行结束"
);
return
false
;
}
/**
* 解析脚本执行返回的结果集
* @author Ickes
* @param in 输入流对象
* @param charset 编码
* @since V0.1
* @return
* 以纯文本的格式返回
*/
private
String
processStdout
(
InputStream
in
,
String
charset
){
InputStream
stdout
=
new
StreamGobbler
(
in
);
StringBuffer
buffer
=
new
StringBuffer
();
BufferedReader
br
=
null
;
try
{
br
=
new
BufferedReader
(
new
InputStreamReader
(
stdout
,
charset
));
String
line
=
null
;
while
((
line
=
br
.
readLine
())
!=
null
){
buffer
.
append
(
line
+
"\n"
);
}
}
catch
(
UnsupportedEncodingException
e
)
{
e
.
printStackTrace
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
finally
{
if
(
null
!=
br
)
{
try
{
br
.
close
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
}
}
return
buffer
.
toString
();
}
public
static
void
setCharset
(
String
charset
)
{
DEFAULTCHART
=
charset
;
}
public
Connection
getConn
()
{
return
conn
;
}
public
void
setConn
(
Connection
conn
)
{
this
.
conn
=
conn
;
}
public
String
getIp
()
{
return
ip
;
}
public
void
setIp
(
String
ip
)
{
this
.
ip
=
ip
;
}
public
String
getUserName
()
{
return
userName
;
}
public
void
setUserName
(
String
userName
)
{
this
.
userName
=
userName
;
}
public
String
getUserPwd
()
{
return
userPwd
;
}
public
void
setUserPwd
(
String
userPwd
)
{
this
.
userPwd
=
userPwd
;
}
public
static
void
main
(
String
[]
args
)
{
/*System.out.println("execute begin ");
RemoteExecuteCommand rec=new RemoteExecuteCommand("10.0.53.56", "root",
"!QAZ2wsx3edc");
String result=rec.execute("sh /root/why/hht_test.sh 11");
System.out.println(result.startsWith("0#"));
if(result.startsWith("0#")) {
System.out.println("execute ok! result="+result);
}else {
System.out.println("execute error! result="+result);
}*/
//RemoteExecuteCommand rec=new RemoteExecuteCommand("192.168.56.101", "sml", "smlhao");
//System.err.println(rec.isExecuteSuccess("/home/sml/checkAgentRun.sh 11"));
RemoteExecuteCommand
rec
=
new
RemoteExecuteCommand
(
"192.168.204.123"
,
"root"
,
"123"
);
String
s
=
rec
.
execute
(
"sh /home/harry/test/test.sh"
);
System
.
out
.
println
(
s
);
String
state
=
"UNKNOWN"
;
String
finalState
=
"UNKNOWN"
;
if
(
StringUtils
.
isNotBlank
(
s
))
{
String
enterLine
=
"\n"
;
int
a
=
1
;
if
(
s
.
indexOf
(
"\r\n"
)
>
-
1
)
{
enterLine
=
"\r\n"
;
a
=
2
;
}
else
if
(
s
.
indexOf
(
"\r"
)
>
-
1
)
{
enterLine
=
"\r"
;
}
else
if
(
s
.
indexOf
(
"\n"
)
>
-
1
)
{
enterLine
=
"\n"
;
}
boolean
flag
=
true
;
while
(
flag
)
{
int
i1
=
s
.
indexOf
(
enterLine
);
String
tmp
=
s
.
substring
(
0
,
i1
);
if
(
tmp
.
contains
(
"State"
))
{
String
[]
ss
=
tmp
.
split
(
":"
);
state
=
ss
[
1
].
trim
();
}
else
if
(
tmp
.
contains
(
"Final-State"
))
{
String
[]
ss
=
tmp
.
split
(
":"
);
finalState
=
ss
[
1
].
trim
();
break
;
}
else
{
s
=
s
.
substring
(
i1
+
a
,
s
.
length
());
}
}
System
.
err
.
println
(
s
);
}
/*String fn = "c.b.a.txt";
int idx = fn.lastIndexOf(".");
System.out.println(fn.substring(idx+1, fn.length()));*/
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/cmdexectool/scheduler/common/enums/MyDbType.java
View file @
02386609
...
...
@@ -52,6 +52,9 @@ public enum MyDbType {
break
;
}
}
if
(
myDbTypeMatch
==
null
)
{
throw
new
RuntimeException
(
"数据源类型没有对应的jdbc映射:"
+
idStr
);
}
return
myDbTypeMatch
;
}
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/common/task/sql/SqlParameters.java
View file @
02386609
...
...
@@ -21,6 +21,7 @@ import java.util.HashMap;
import
java.util.List
;
import
java.util.Map
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer
;
...
...
@@ -187,10 +188,10 @@ public class SqlParameters extends AbstractParameters {
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
JSONObject
apiObj
=
scriptObj
.
getJSONObject
(
"api"
);
String
columnFieldsObj
=
apiObj
.
getString
(
"columnFields"
);
...
...
@@ -203,6 +204,7 @@ public class SqlParameters extends AbstractParameters {
//sink
Map
<
String
,
String
>
sinkApiModel
=
new
HashMap
<
String
,
String
>();
sinkApiModel
.
put
(
"source_table_name"
,
"t"
);
sinkApiModel
.
put
(
"url"
,
apiObj
.
getString
(
"apiUrl"
));
sinkApiModel
.
put
(
"apiKey"
,
apiObj
.
getString
(
"apiKey"
));
sinkApiModel
.
put
(
"method"
,
apiObj
.
getString
(
"method"
));
...
...
@@ -227,15 +229,16 @@ public class SqlParameters extends AbstractParameters {
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink
JSONObject
topicObj
=
scriptObj
.
getJSONObject
(
"topic"
);
Map
<
String
,
String
>
kafkaModel
=
new
HashMap
<
String
,
String
>();
kafkaModel
.
put
(
"source_table_name"
,
"t"
);
kafkaModel
.
put
(
"topic"
,
topicObj
.
getString
(
"topic"
));
kafkaModel
.
put
(
"broker"
,
topicObj
.
getString
(
"server"
));
sink
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SINK_KAFKA
,
kafkaModel
,
freeMarkerConfig
);
...
...
@@ -262,9 +265,13 @@ public class SqlParameters extends AbstractParameters {
String
postImportStatement
=
tableObj
.
getString
(
"postImportStatement"
);
preStatements
=
new
ArrayList
<
String
>();
if
(
StringUtils
.
isNotEmpty
(
preImportStatement
))
{
preStatements
.
add
(
preImportStatement
);
}
posStatements
=
new
ArrayList
<
String
>();
if
(
StringUtils
.
isNotEmpty
(
postImportStatement
))
{
posStatements
.
add
(
postImportStatement
);
}
//设置目标执行前导后导语句目标数据源
Integer
targetSourceId
=
tableObj
.
getInteger
(
"targetSourceId"
);
...
...
@@ -273,7 +280,7 @@ public class SqlParameters extends AbstractParameters {
String
jdbcUrl
=
targetSource
.
getJdbcUrl
();
String
user
=
targetSource
.
getUserName
();
String
password
=
EncryptionUtils
.
decode
(
targetSource
.
getPassword
(),
publicKey
);
MyDbType
myDbType
=
MyDbType
.
obtainByIdStr
(
targetSource
.
get
Id
().
toString
());
MyDbType
myDbType
=
MyDbType
.
obtainByIdStr
(
targetSource
.
get
DatasourceType
().
toString
());
targetBaseDataSource
=
new
MyBaseDataSource
();
targetBaseDataSource
.
setJdbcUrlDirect
(
jdbcUrl
);
...
...
@@ -287,16 +294,16 @@ public class SqlParameters extends AbstractParameters {
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
String
tableFieldsObj
=
tableObj
.
getString
(
"tableFields"
);
String
sqlStr
=
ParameterUtils
.
columnMappingHandler
(
tableFieldsObj
);
Map
<
String
,
String
>
transformMappingSqlModel
=
new
HashMap
<
String
,
String
>();
transformMappingSqlModel
.
put
(
"source_table_name"
,
"t
able_view
"
);
transformMappingSqlModel
.
put
(
"source_table_name"
,
"t"
);
transformMappingSqlModel
.
put
(
"sql"
,
sqlStr
);
transform
=
transform
+
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformMappingSqlModel
,
freeMarkerConfig
);
...
...
@@ -310,14 +317,17 @@ public class SqlParameters extends AbstractParameters {
||
this
.
targetBaseDataSource
.
getMyDbType
()
==
MyDbType
.
DB2
||
this
.
targetBaseDataSource
.
getMyDbType
()
==
MyDbType
.
INFORMIX
)
{
String
targetTableName
=
tableObj
.
getString
(
"targetTableName"
);
Map
<
String
,
String
>
sinkJdbcModel
=
new
HashMap
<
String
,
String
>();
sinkJdbcModel
.
put
(
"source_table_name"
,
"t"
);
sinkJdbcModel
.
put
(
"save_mode"
,
"overwrite"
);
sinkJdbcModel
.
put
(
"truncate"
,
"true"
);
sinkJdbcModel
.
put
(
"url"
,
targetSource
.
getJdbcUrl
()
);
sinkJdbcModel
.
put
(
"url"
,
jdbcUrl
);
sinkJdbcModel
.
put
(
"driver"
,
targetSource
.
getDriverClassName
());
sinkJdbcModel
.
put
(
"user"
,
targetSource
.
getUserName
()
);
sinkJdbcModel
.
put
(
"password"
,
targetSource
.
getPassword
()
);
sinkJdbcModel
.
put
(
"dbtable"
,
target
Source
.
getDbName
()
);
sinkJdbcModel
.
put
(
"user"
,
user
);
sinkJdbcModel
.
put
(
"password"
,
password
);
sinkJdbcModel
.
put
(
"dbtable"
,
target
TableName
);
sink
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SINK_JDBC
,
sinkJdbcModel
,
freeMarkerConfig
);
}
}
...
...
@@ -338,15 +348,16 @@ public class SqlParameters extends AbstractParameters {
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink
JSONObject
hdfsObj
=
scriptObj
.
getJSONObject
(
"hdfs"
);
String
hdfsDir
=
hdfsObj
.
getString
(
"hdfsDir"
);
Map
<
String
,
String
>
hdfsModel
=
new
HashMap
<
String
,
String
>();
hdfsModel
.
put
(
"source_table_name"
,
"t"
);
hdfsModel
.
put
(
"path"
,
hdfsDir
);
sink
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SINK_HDFS
,
hdfsModel
,
freeMarkerConfig
);
}
...
...
@@ -365,12 +376,13 @@ public class SqlParameters extends AbstractParameters {
return
;
}
//transform
Map
<
String
,
String
>
transformSqlModel
=
new
HashMap
<
String
,
String
>();
transformSqlModel
.
put
(
"source_table_name"
,
"table_view"
);
transformSqlModel
.
put
(
"sql"
,
this
.
sqlScript
);
transform
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_TRANSFORM_SQL
,
transformSqlModel
,
freeMarkerConfig
);
//
Map<String, String> transformSqlModel = new HashMap<String, String>();
//
transformSqlModel.put("source_table_name", "table_view");
//
transformSqlModel.put("sql", this.sqlScript);
//
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink
Map
<
String
,
String
>
stdoutModel
=
new
HashMap
<
String
,
String
>();
stdoutModel
.
put
(
"source_table_name"
,
"t"
);
sink
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SINK_CONSOLE
,
stdoutModel
,
freeMarkerConfig
);
}
...
...
@@ -392,10 +404,8 @@ public class SqlParameters extends AbstractParameters {
String
jdbcUrl
=
dmpSyncingDatasource
.
getJdbcUrl
();
String
user
=
dmpSyncingDatasource
.
getUserName
();
String
password
=
EncryptionUtils
.
decode
(
dmpSyncingDatasource
.
getPassword
(),
publicKey
);
MyDbType
myDbType
=
MyDbType
.
obtainByIdStr
(
dmpSyncingDatasource
.
get
Id
().
toString
());
MyDbType
myDbType
=
MyDbType
.
obtainByIdStr
(
dmpSyncingDatasource
.
get
DatasourceType
().
toString
());
// 如果执行引擎选择的事jdbc,不用生成waterdrop source
if
(
this
.
executioEngine
.
equals
(
CommConstant
.
EXECUTION_ENGINE_JDBC
))
{
sourceBaseDataSource
=
new
MyBaseDataSource
();
sourceBaseDataSource
.
setJdbcUrlDirect
(
jdbcUrl
);
...
...
@@ -403,6 +413,9 @@ public class SqlParameters extends AbstractParameters {
sourceBaseDataSource
.
setPassword
(
password
);
sourceBaseDataSource
.
setMyDbType
(
myDbType
);
// 如果执行引擎选择的事jdbc,不用生成waterdrop source
if
(
this
.
executioEngine
.
equals
(
CommConstant
.
EXECUTION_ENGINE_JDBC
))
{
return
;
}
...
...
@@ -416,8 +429,8 @@ public class SqlParameters extends AbstractParameters {
Map
<
String
,
String
>
jdbcModel
=
new
HashMap
<
String
,
String
>();
jdbcModel
.
put
(
"driver"
,
dmpSyncingDatasource
.
getDriverClassName
());
jdbcModel
.
put
(
"url"
,
jdbcUrl
);
jdbcModel
.
put
(
"table"
,
this
.
sqlScript
);
jdbcModel
.
put
(
"result_table_name"
,
"t
able_view
"
);
jdbcModel
.
put
(
"table"
,
"("
+
this
.
sqlScript
+
") AS t"
);
jdbcModel
.
put
(
"result_table_name"
,
"t"
);
jdbcModel
.
put
(
"user"
,
user
);
jdbcModel
.
put
(
"password"
,
password
);
this
.
source
=
FreeMarkerUtils
.
freemakerJson
(
CommConstant
.
WATERDROP_FTL_SOURCE_JDBC
,
jdbcModel
,
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/common/utils/ParameterUtils.java
View file @
02386609
...
...
@@ -270,7 +270,7 @@ public class ParameterUtils {
Map
<
String
,
JSONObject
>
sourceMap
=
new
HashMap
<
String
,
JSONObject
>();
for
(
int
index
=
0
;
index
<
sourceArray
.
size
();
index
++)
{
JSONObject
sourceObj
=
sourceArray
.
getJSONObject
(
index
);
sourceMap
.
put
(
sourceObj
.
getString
(
"customSoruceFi
le
dId"
),
sourceObj
);
sourceMap
.
put
(
sourceObj
.
getString
(
"customSoruceFi
el
dId"
),
sourceObj
);
}
JSONArray
targetArray
=
jsonObject
.
getJSONArray
(
"targetFields"
);
Map
<
String
,
JSONObject
>
targetMap
=
new
HashMap
<
String
,
JSONObject
>();
...
...
@@ -285,8 +285,8 @@ public class ParameterUtils {
Integer
size
=
mappingArray
.
size
();
for
(
int
index
=
0
;
index
<
size
;
index
++)
{
JSONObject
mappingObj
=
mappingArray
.
getJSONObject
(
index
);
JSONObject
sourceObj
=
sourceMap
.
get
(
mappingObj
.
getString
(
"customSoruceFi
le
dId"
));
JSONObject
targetObj
=
sourceMap
.
get
(
mappingObj
.
getString
(
"customTargetFile
dId"
));
JSONObject
sourceObj
=
sourceMap
.
get
(
mappingObj
.
getString
(
"customSoruceFi
el
dId"
));
JSONObject
targetObj
=
targetMap
.
get
(
mappingObj
.
getString
(
"customTargetFiel
dId"
));
//String customSoruceFiledId = sourceObj.getString("customSoruceFiledId");
String
sourceFieldName
=
sourceObj
.
getString
(
"sourceFieldName"
);
...
...
@@ -302,7 +302,7 @@ public class ParameterUtils {
sb
.
append
(
" "
);
}
}
sb
.
append
(
"FROM t
_view
"
);
sb
.
append
(
"FROM t"
);
return
sb
.
toString
();
}
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/dao/datasource/BaseDataSource.java
View file @
02386609
...
...
@@ -90,7 +90,11 @@ public abstract class BaseDataSource {
* @return getJdbcUrl
*/
public
String
getJdbcUrl
()
{
StringBuilder
jdbcUrl
=
new
StringBuilder
(
getAddress
());
String
addressStr
=
getAddress
();
if
(
addressStr
==
null
)
{
return
null
;
}
StringBuilder
jdbcUrl
=
new
StringBuilder
(
addressStr
);
appendDatabase
(
jdbcUrl
);
appendPrincipal
(
jdbcUrl
);
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/dao/utils/DatabaseUtils.java
View file @
02386609
...
...
@@ -288,6 +288,7 @@ public class DatabaseUtils {
* @return SqlBinds
*/
public
static
SqlBinds
getSqlAndSqlParamsMap
(
String
sql
)
{
Map
<
Integer
,
Property
>
sqlParamsMap
=
new
HashMap
<>();
StringBuilder
sqlBuilder
=
new
StringBuilder
();
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/server/worker/task/WaterdropCommandExecutor.java
View file @
02386609
...
...
@@ -95,7 +95,8 @@ public class WaterdropCommandExecutor extends AbstractCommandExecutor {
if
(
OSUtils
.
isWindows
())
{
//sb.append("@echo off\n");
//sb.append("cd /d %~dp0\n");
sb
.
append
(
"./bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/application.conf"
);
sb
.
append
(
"/app/waterdrop/bin/start-waterdrop-spark.sh --master yarn --deploy-mode client --config "
);
//sb.append("scp "+execCommand+" root@192.168.1.221:/opt");
if
(
taskExecutionContext
.
getEnvFile
()
!=
null
)
{
sb
.
append
(
"call "
).
append
(
taskExecutionContext
.
getEnvFile
()).
append
(
"\n"
);
}
...
...
@@ -103,13 +104,15 @@ public class WaterdropCommandExecutor extends AbstractCommandExecutor {
//sb.append("#!/bin/sh\n");
//sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
//sb.append("cd $BASEDIR\n");
sb
.
append
(
"./bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/application.conf"
);
sb
.
append
(
"/app/waterdrop/bin/start-waterdrop-spark.sh --master yarn --deploy-mode client --config "
);
//sb.append("scp "+execCommand+" root@192.168.1.221:/opt");
if
(
taskExecutionContext
.
getEnvFile
()
!=
null
)
{
sb
.
append
(
"source "
).
append
(
taskExecutionContext
.
getEnvFile
()).
append
(
"\n"
);
}
}
sb
.
append
(
execCommand
);
//sb.append("/application.conf");
logger
.
info
(
"command : {}"
,
sb
.
toString
());
// write data to file
...
...
src/main/java/com/jz/dmp/cmdexectool/scheduler/server/worker/task/sql/SqlTask.java
View file @
02386609
...
...
@@ -36,7 +36,9 @@ import org.apache.commons.lang3.StringUtils;
import
org.slf4j.Logger
;
import
org.springframework.util.CollectionUtils
;
import
com.alibaba.fastjson.JSONObject
;
import
com.jz.dmp.cmdexectool.common.constant.CommConstant
;
import
com.jz.dmp.cmdexectool.common.utils.RemoteExecuteCmdUtils
;
import
com.jz.dmp.cmdexectool.scheduler.common.Constants
;
import
com.jz.dmp.cmdexectool.scheduler.common.enums.DbType
;
import
com.jz.dmp.cmdexectool.scheduler.common.process.Property
;
...
...
@@ -104,25 +106,22 @@ public class SqlTask extends AbstractTask {
try
{
MyBaseDataSource
targetBaseDataSource
=
sqlParameters
.
getTargetBaseDataSource
();
//System.out.println(JSONObject.toJSONString(sqlParameters.getPreStatements()));
//System.out.println(CollectionUtils.isEmpty(sqlParameters.getPreStatements()));
if
(!
CollectionUtils
.
isEmpty
(
sqlParameters
.
getPreStatements
()))
{
List
<
SqlBinds
>
preStatementSqlBinds
=
Optional
.
ofNullable
(
sqlParameters
.
getPreStatements
())
.
orElse
(
new
ArrayList
<>())
.
stream
()
.
map
(
DatabaseUtils:
:
getSqlAndSqlParamsMap
)
.
collect
(
Collectors
.
toList
());
List
<
SqlBinds
>
postStatementSqlBinds
=
Optional
.
ofNullable
(
sqlParameters
.
getPosStatements
())
.
orElse
(
new
ArrayList
<>())
.
stream
()
.
map
(
DatabaseUtils:
:
getSqlAndSqlParamsMap
)
.
collect
(
Collectors
.
toList
());
//判断是否需要运行前置sql
if
(!
CollectionUtils
.
isEmpty
(
preStatementSqlBinds
))
{
DatabaseUtils
.
executeUpdateSql
(
preStatementSqlBinds
,
targetBaseDataSource
);
}
}
if
(
sqlParameters
.
getExecutioEngine
().
equals
(
CommConstant
.
EXECUTION_ENGINE_JDBC
))
{
List
<
String
>
mainSqlScript
=
new
ArrayList
<
String
>();
...
...
@@ -138,16 +137,31 @@ public class SqlTask extends AbstractTask {
}
else
{
// construct process
CommandExecuteResult
commandExecuteResult
=
waterdropCommandExecutor
.
run
(
buildCommand
());
//String cmd = "/app/waterdrop/bin/start-waterdrop-spark.sh --master yarn --deploy-mode client --config /opt/application.conf";
//boolean flag = RemoteExecuteCmdUtils.sshCmd("192.168.1.221", "root", "123", cmd);
//if (!flag) {
// throw new RuntimeException("执行waterdrop失败!");
//}
setExitStatusCode
(
commandExecuteResult
.
getExitStatusCode
());
setAppIds
(
commandExecuteResult
.
getAppIds
());
setProcessId
(
commandExecuteResult
.
getProcessId
());
}
if
(!
CollectionUtils
.
isEmpty
(
sqlParameters
.
getPosStatements
()))
{
List
<
SqlBinds
>
postStatementSqlBinds
=
Optional
.
ofNullable
(
sqlParameters
.
getPosStatements
())
.
orElse
(
new
ArrayList
<>())
.
stream
()
.
map
(
DatabaseUtils:
:
getSqlAndSqlParamsMap
)
.
collect
(
Collectors
.
toList
());
//判断是否运行后置sql
if
(!
CollectionUtils
.
isEmpty
(
postStatementSqlBinds
))
{
DatabaseUtils
.
executeUpdateSql
(
postStatementSqlBinds
,
targetBaseDataSource
);
}
}
}
catch
(
Exception
e
)
{
logger
.
error
(
"sql task error"
,
e
);
setExitStatusCode
(
Constants
.
EXIT_CODE_FAILURE
);
...
...
@@ -170,8 +184,11 @@ public class SqlTask extends AbstractTask {
*/
private
String
buildCommand
()
throws
Exception
{
// generate scripts
String
fileName
=
String
.
format
(
"%s/%s_node.%s"
,
taskExecutionContext
.
getExecutePath
(),
taskExecutionContext
.
getTaskAppId
(),
OSUtils
.
isWindows
()
?
"bat"
:
"sh"
);
//String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
// taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
String
fileName
=
String
.
format
(
"%s/%s.%s"
,
taskExecutionContext
.
getExecutePath
(),
"application"
,
"conf"
);
Path
path
=
new
File
(
fileName
).
toPath
();
...
...
@@ -207,4 +224,9 @@ public class SqlTask extends AbstractTask {
return
sqlParameters
;
}
public
static
void
main
(
String
[]
args
)
{
List
<
String
>
list
=
new
ArrayList
<
String
>();
System
.
out
.
println
(
CollectionUtils
.
isEmpty
(
list
));
}
}
src/main/resources/mapperconf/DmpPublicConfigInfoMapper.xml
View file @
02386609
src/main/resources/templates/sink_console.ftl
View file @
02386609
stdout {
console {
<#if source_table_name??>
source_table_name = ${source_table_name!}
</#if>
<#if limit??>
# 限制输出条数,范围[-1, 2147483647]
limit = ${limit!}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment