Commit 2a9c340d authored by DESKTOP-5KITOLR\Administrator's avatar DESKTOP-5KITOLR\Administrator
parents a9d4fcb9 a8b57ada
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
<shiro.version>1.2.3</shiro.version> <shiro.version>1.2.3</shiro.version>
<mybatis-pagehelper.version>4.2.0</mybatis-pagehelper.version> <mybatis-pagehelper.version>4.2.0</mybatis-pagehelper.version>
<start-class>com.jz.Application</start-class> <start-class>com.jz.dmp.cmdexectool.ApiApplication</start-class>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties> </properties>
...@@ -94,11 +94,6 @@ ...@@ -94,11 +94,6 @@
</exclusions> </exclusions>
</dependency> </dependency>
<!-- swagger --> <!-- swagger -->
<!--<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.4.0</version>
</dependency>-->
<dependency> <dependency>
<groupId>io.springfox</groupId> <groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId> <artifactId>springfox-swagger2</artifactId>
......
...@@ -2,14 +2,13 @@ package com.jz.dmp.cmdexectool; ...@@ -2,14 +2,13 @@ package com.jz.dmp.cmdexectool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator; import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching; import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.annotation.EnableTransactionManagement;
import com.jz.dmp.cmdexectool.common.utils.ApplicationContextUtil; import com.jz.dmp.cmdexectool.common.utils.ApplicationContextUtil;
import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService; import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
...@@ -23,7 +22,7 @@ public class ApiApplication implements HealthIndicator { ...@@ -23,7 +22,7 @@ public class ApiApplication implements HealthIndicator {
public static void main(String[] args) { public static void main(String[] args) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
SpringApplication springApplication = new SpringApplication(ApiApplication.class); SpringApplication springApplication = new SpringApplication(ApiApplication.class);
springApplication.run(args); ConfigurableApplicationContext context = springApplication.run(args);
Integer taskId = Integer.parseInt(args[0]); Integer taskId = Integer.parseInt(args[0]);
if (taskId==null) { if (taskId==null) {
...@@ -40,6 +39,9 @@ public class ApiApplication implements HealthIndicator { ...@@ -40,6 +39,9 @@ public class ApiApplication implements HealthIndicator {
long cost = System.currentTimeMillis() - start; long cost = System.currentTimeMillis() - start;
logger.info(" started status: {}, cost: {}", "SUCCESS!", cost); logger.info(" started status: {}, cost: {}", "SUCCESS!", cost);
springApplication.exit(context);
} }
@Override @Override
......
...@@ -58,4 +58,10 @@ public class CommConstant { ...@@ -58,4 +58,10 @@ public class CommConstant {
public static final String WATERDROP_FTL_SINK_JDBC = "sink_jdbc.ftl"; public static final String WATERDROP_FTL_SINK_JDBC = "sink_jdbc.ftl";
public static final String WATERDROP_FTL_SINK_KAFKA = "sink_kafka.ftl"; public static final String WATERDROP_FTL_SINK_KAFKA = "sink_kafka.ftl";
//其他script模板
public static final String FTL_SFTP_DOWNLOAD = "sftp_download.ftl";//ftp下载
public static final String FTL_UNZIPFILE = "unzipfile.ftl";//文件解压
public static final String FTL_DOCTRANS = "doctrans.ftl";//文件转码
public static final String FTL_HDFS_UPLOAD = "hdfs_upload.ftl";//HDFS上传
} }
...@@ -2,16 +2,16 @@ package com.jz.dmp.cmdexectool.scheduler.common.enums; ...@@ -2,16 +2,16 @@ package com.jz.dmp.cmdexectool.scheduler.common.enums;
public enum JobType { public enum JobType {
START("start","开始任务"), start("start","开始任务"),
SHELL("shell","shell任务"), shell("shell","shell任务"),
SQL("sql","sql任务"), sql("sql","sql任务"),
SYNC("sync","离线任务"), sync("sync","离线任务"),
SUBPROCESS("subprocess","子流程任务"), subprocess("subprocess","子流程任务"),
FTP("ftp","ftp下载任务"), ftp("ftp","ftp下载任务"),
UNZIPFILE("unzipFile","解压文件任务"), unzipFile("unzipFile","解压文件任务"),
DOCTRANS("docTrans","文件转码任务"), docTrans("docTrans","文件转码任务"),
HDFS("hdfs","hdfs上传任务"), hdfs("hdfs","hdfs上传任务"),
STOP("stop","停止任务"); stop("stop","停止任务");
//job类型 //job类型
private String jobTypeStr; private String jobTypeStr;
......
...@@ -16,9 +16,17 @@ ...@@ -16,9 +16,17 @@
*/ */
package com.jz.dmp.cmdexectool.scheduler.common.task.docTrans; package com.jz.dmp.cmdexectool.scheduler.common.task.docTrans;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/** /**
* Sql/Hql parameter * Sql/Hql parameter
...@@ -37,10 +45,39 @@ public class DoctransParameters extends AbstractParameters { ...@@ -37,10 +45,39 @@ public class DoctransParameters extends AbstractParameters {
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
/**
* command需要执行的脚本
*/
private String cmdScript;
public DoctransParameters() { public DoctransParameters() {
} }
public DoctransParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, String> doctransModel = new HashMap<String, String>();
String srcDir = scriptObj.getString("ftpUrl");
String[] srcDirSuffix = srcDir.split(".");
String fileSuffix = srcDirSuffix[1];
String desDir = scriptObj.getString("ftpUrl");
String sourceConvert = scriptObj.getString("ftpUrl");
String sinkConvert = scriptObj.getString("ftpUrl");
doctransModel.put("src_dir", srcDir);
doctransModel.put("des_dir", desDir);
doctransModel.put("source_convert", sourceConvert);
doctransModel.put("sink_convert", sinkConvert);
doctransModel.put("file_suffix", fileSuffix);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_DOCTRANS, doctransModel, freeMarkerConfig);
}
public String getScript() { public String getScript() {
return script; return script;
} }
...@@ -65,9 +102,17 @@ public class DoctransParameters extends AbstractParameters { ...@@ -65,9 +102,17 @@ public class DoctransParameters extends AbstractParameters {
this.resourceList = resourceList; this.resourceList = resourceList;
} }
public String getCmdScript() {
return cmdScript;
}
public void setCmdScript(String cmdScript) {
this.cmdScript = cmdScript;
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return script != null && !script.isEmpty(); return cmdScript != null && !cmdScript.isEmpty();
} }
@Override @Override
......
...@@ -16,15 +16,27 @@ ...@@ -16,15 +16,27 @@
*/ */
package com.jz.dmp.cmdexectool.scheduler.common.task.ftp; package com.jz.dmp.cmdexectool.scheduler.common.task.ftp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.List;
/** /**
* Sql/Hql parameter * Sql/Hql parameter
*/ */
public class FtpParameters extends AbstractParameters { public class FtpParameters extends AbstractParameters {
private static Logger logger = LoggerFactory.getLogger(FtpParameters.class);
/** /**
* shell script * shell script
*/ */
...@@ -37,10 +49,55 @@ public class FtpParameters extends AbstractParameters { ...@@ -37,10 +49,55 @@ public class FtpParameters extends AbstractParameters {
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
/**
* command需要执行的脚本
*/
private String cmdScript;
public FtpParameters() { public FtpParameters() {
} }
public FtpParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, String> ftpModel = new HashMap<String, String>();
String ftpUrl = scriptObj.getString("ftpUrl");
String[] ftpUrlPort = ftpUrl.split(":");
String ip = ftpUrlPort[0];
String port = "22";
if (ftpUrlPort.length>1) {
port = ftpUrlPort[1];
}
String username = scriptObj.getString("ftpUsername");
String password = scriptObj.getString("ftpPassword");
String srcDir = scriptObj.getString("ftpSourceFileDir");
String ftpSaveDestDir = scriptObj.getString("ftpSaveDestDir");
String[] desDirFileSuffix = ftpSaveDestDir.split("\\.");
String desDir = desDirFileSuffix[0];
String fileSuffix = "";
if (desDirFileSuffix.length>1) {
fileSuffix = desDirFileSuffix[1];
}
ftpModel.put("ip", ip);
ftpModel.put("port", port);
ftpModel.put("username", username);
ftpModel.put("password", password);
ftpModel.put("src_dir", srcDir);
ftpModel.put("des_dir", desDir);
ftpModel.put("file_suffix", fileSuffix);
logger.info("调用ftp command模板参数【{}】", JSONObject.toJSONString(ftpModel));
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_SFTP_DOWNLOAD, ftpModel, freeMarkerConfig);
}
public String getScript() { public String getScript() {
return script; return script;
} }
...@@ -65,9 +122,17 @@ public class FtpParameters extends AbstractParameters { ...@@ -65,9 +122,17 @@ public class FtpParameters extends AbstractParameters {
this.resourceList = resourceList; this.resourceList = resourceList;
} }
public String getCmdScript() {
return cmdScript;
}
public void setCmdScript(String cmdScript) {
this.cmdScript = cmdScript;
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return script != null && !script.isEmpty(); return cmdScript != null && !cmdScript.isEmpty();
} }
@Override @Override
......
...@@ -16,9 +16,17 @@ ...@@ -16,9 +16,17 @@
*/ */
package com.jz.dmp.cmdexectool.scheduler.common.task.hdfs; package com.jz.dmp.cmdexectool.scheduler.common.task.hdfs;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/** /**
* Sql/Hql parameter * Sql/Hql parameter
...@@ -37,10 +45,33 @@ public class HdfsParameters extends AbstractParameters { ...@@ -37,10 +45,33 @@ public class HdfsParameters extends AbstractParameters {
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
/**
* command需要执行的脚本
*/
private String cmdScript;
public HdfsParameters() { public HdfsParameters() {
} }
public HdfsParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, String> HdfsModel = new HashMap<String, String>();
String srcDir = scriptObj.getString("localUploadFileDir");
String fileSuffix = scriptObj.getString("hdfsUploadFileFilter");
String desDir = scriptObj.getString("hdfsUploadSaveDir");
HdfsModel.put("src_dir", srcDir);
HdfsModel.put("file_suffix", fileSuffix);
HdfsModel.put("des_dir", desDir);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_HDFS_UPLOAD, HdfsModel, freeMarkerConfig);
}
public String getScript() { public String getScript() {
return script; return script;
} }
...@@ -65,9 +96,17 @@ public class HdfsParameters extends AbstractParameters { ...@@ -65,9 +96,17 @@ public class HdfsParameters extends AbstractParameters {
this.resourceList = resourceList; this.resourceList = resourceList;
} }
public String getCmdScript() {
return cmdScript;
}
public void setCmdScript(String cmdScript) {
this.cmdScript = cmdScript;
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return script != null && !script.isEmpty(); return cmdScript != null && !cmdScript.isEmpty();
} }
@Override @Override
......
...@@ -16,9 +16,17 @@ ...@@ -16,9 +16,17 @@
*/ */
package com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile; package com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo; import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/** /**
* Sql/Hql parameter * Sql/Hql parameter
...@@ -37,10 +45,34 @@ public class UnzipfileParameters extends AbstractParameters { ...@@ -37,10 +45,34 @@ public class UnzipfileParameters extends AbstractParameters {
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
/**
* command需要执行的脚本
*/
private String cmdScript;
public UnzipfileParameters() { public UnzipfileParameters() {
} }
public UnzipfileParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, String> unzipfileModel = new HashMap<String, String>();
String srcDir = scriptObj.getString("compressedFileDir");
String desDir = scriptObj.getString("zipOutputDir");
String type = scriptObj.getString("decompressFormat");
unzipfileModel.put("src_dir", srcDir);
unzipfileModel.put("des_dir", desDir);
unzipfileModel.put("type", type);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_UNZIPFILE, unzipfileModel, freeMarkerConfig);
}
public String getScript() { public String getScript() {
return script; return script;
} }
...@@ -65,9 +97,17 @@ public class UnzipfileParameters extends AbstractParameters { ...@@ -65,9 +97,17 @@ public class UnzipfileParameters extends AbstractParameters {
this.resourceList = resourceList; this.resourceList = resourceList;
} }
public String getCmdScript() {
return cmdScript;
}
public void setCmdScript(String cmdScript) {
this.cmdScript = cmdScript;
}
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return script != null && !script.isEmpty(); return cmdScript != null && !cmdScript.isEmpty();
} }
@Override @Override
......
...@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils; ...@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/** /**
...@@ -47,9 +48,9 @@ public class DoctransTask extends AbstractTask { ...@@ -47,9 +48,9 @@ public class DoctransTask extends AbstractTask {
private DoctransParameters doctransParameters; private DoctransParameters doctransParameters;
/** /**
* waterdrop command executor * shell command executor
*/ */
private WaterdropCommandExecutor waterdropCommandExecutor; private ShellCommandExecutor shellCommandExecutor;
/** /**
* taskExecutionContext * taskExecutionContext
...@@ -66,7 +67,7 @@ public class DoctransTask extends AbstractTask { ...@@ -66,7 +67,7 @@ public class DoctransTask extends AbstractTask {
super(taskExecutionContext, logger); super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
} }
@Override @Override
...@@ -85,7 +86,7 @@ public class DoctransTask extends AbstractTask { ...@@ -85,7 +86,7 @@ public class DoctransTask extends AbstractTask {
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand()); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
...@@ -99,7 +100,7 @@ public class DoctransTask extends AbstractTask { ...@@ -99,7 +100,7 @@ public class DoctransTask extends AbstractTask {
@Override @Override
public void cancelApplication(boolean cancelApplication) throws Exception { public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process // cancel process
waterdropCommandExecutor.cancelApplication(); shellCommandExecutor.cancelApplication();
} }
/** /**
...@@ -137,7 +138,7 @@ public class DoctransTask extends AbstractTask { ...@@ -137,7 +138,7 @@ public class DoctransTask extends AbstractTask {
Files.createFile(path, attr); Files.createFile(path, attr);
} }
Files.write(path, doctransParameters.getScript().getBytes(), StandardOpenOption.APPEND); Files.write(path, doctransParameters.getCmdScript().getBytes(), StandardOpenOption.APPEND);
return fileName; return fileName;
} }
......
...@@ -24,9 +24,7 @@ import java.nio.file.attribute.FileAttribute; ...@@ -24,9 +24,7 @@ import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set; import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import com.jz.dmp.cmdexectool.scheduler.common.Constants; import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters;
...@@ -34,7 +32,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils; ...@@ -34,7 +32,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor;
/** /**
* sql task * sql task
...@@ -49,7 +47,7 @@ public class FtpTask extends AbstractTask { ...@@ -49,7 +47,7 @@ public class FtpTask extends AbstractTask {
/** /**
* waterdrop command executor * waterdrop command executor
*/ */
private WaterdropCommandExecutor waterdropCommandExecutor; private ShellCommandExecutor shellCommandExecutor;
/** /**
* taskExecutionContext * taskExecutionContext
...@@ -66,7 +64,7 @@ public class FtpTask extends AbstractTask { ...@@ -66,7 +64,7 @@ public class FtpTask extends AbstractTask {
super(taskExecutionContext, logger); super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
} }
@Override @Override
...@@ -85,7 +83,7 @@ public class FtpTask extends AbstractTask { ...@@ -85,7 +83,7 @@ public class FtpTask extends AbstractTask {
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand()); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
...@@ -99,7 +97,7 @@ public class FtpTask extends AbstractTask { ...@@ -99,7 +97,7 @@ public class FtpTask extends AbstractTask {
@Override @Override
public void cancelApplication(boolean cancelApplication) throws Exception { public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process // cancel process
waterdropCommandExecutor.cancelApplication(); shellCommandExecutor.cancelApplication();
} }
/** /**
...@@ -137,7 +135,7 @@ public class FtpTask extends AbstractTask { ...@@ -137,7 +135,7 @@ public class FtpTask extends AbstractTask {
Files.createFile(path, attr); Files.createFile(path, attr);
} }
Files.write(path, ftpParameters.getScript().getBytes(), StandardOpenOption.APPEND); Files.write(path, ftpParameters.getCmdScript().getBytes(), StandardOpenOption.APPEND);
return fileName; return fileName;
} }
......
...@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils; ...@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/** /**
...@@ -47,9 +48,9 @@ public class HdfsTask extends AbstractTask { ...@@ -47,9 +48,9 @@ public class HdfsTask extends AbstractTask {
private HdfsParameters hdfsParameters; private HdfsParameters hdfsParameters;
/** /**
* waterdrop command executor * shell command executor
*/ */
private WaterdropCommandExecutor waterdropCommandExecutor; private ShellCommandExecutor shellCommandExecutor;
/** /**
* taskExecutionContext * taskExecutionContext
...@@ -66,7 +67,7 @@ public class HdfsTask extends AbstractTask { ...@@ -66,7 +67,7 @@ public class HdfsTask extends AbstractTask {
super(taskExecutionContext, logger); super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
} }
@Override @Override
...@@ -85,7 +86,7 @@ public class HdfsTask extends AbstractTask { ...@@ -85,7 +86,7 @@ public class HdfsTask extends AbstractTask {
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand()); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
...@@ -99,7 +100,7 @@ public class HdfsTask extends AbstractTask { ...@@ -99,7 +100,7 @@ public class HdfsTask extends AbstractTask {
@Override @Override
public void cancelApplication(boolean cancelApplication) throws Exception { public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process // cancel process
waterdropCommandExecutor.cancelApplication(); shellCommandExecutor.cancelApplication();
} }
/** /**
...@@ -137,7 +138,7 @@ public class HdfsTask extends AbstractTask { ...@@ -137,7 +138,7 @@ public class HdfsTask extends AbstractTask {
Files.createFile(path, attr); Files.createFile(path, attr);
} }
Files.write(path, hdfsParameters.getScript().getBytes(), StandardOpenOption.APPEND); Files.write(path, hdfsParameters.getCmdScript().getBytes(), StandardOpenOption.APPEND);
return fileName; return fileName;
} }
......
...@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils; ...@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/** /**
...@@ -47,9 +48,9 @@ public class UnzipfileTask extends AbstractTask { ...@@ -47,9 +48,9 @@ public class UnzipfileTask extends AbstractTask {
private UnzipfileParameters unzipfileParameters; private UnzipfileParameters unzipfileParameters;
/** /**
* waterdrop command executor * shell command executor
*/ */
private WaterdropCommandExecutor waterdropCommandExecutor; private ShellCommandExecutor shellCommandExecutor;
/** /**
* taskExecutionContext * taskExecutionContext
...@@ -66,7 +67,7 @@ public class UnzipfileTask extends AbstractTask { ...@@ -66,7 +67,7 @@ public class UnzipfileTask extends AbstractTask {
super(taskExecutionContext, logger); super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
} }
@Override @Override
...@@ -85,7 +86,7 @@ public class UnzipfileTask extends AbstractTask { ...@@ -85,7 +86,7 @@ public class UnzipfileTask extends AbstractTask {
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand()); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
...@@ -99,7 +100,7 @@ public class UnzipfileTask extends AbstractTask { ...@@ -99,7 +100,7 @@ public class UnzipfileTask extends AbstractTask {
@Override @Override
public void cancelApplication(boolean cancelApplication) throws Exception { public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process // cancel process
waterdropCommandExecutor.cancelApplication(); shellCommandExecutor.cancelApplication();
} }
/** /**
...@@ -137,7 +138,7 @@ public class UnzipfileTask extends AbstractTask { ...@@ -137,7 +138,7 @@ public class UnzipfileTask extends AbstractTask {
Files.createFile(path, attr); Files.createFile(path, attr);
} }
Files.write(path, unzipfileParameters.getScript().getBytes(), StandardOpenOption.APPEND); Files.write(path, unzipfileParameters.getCmdScript().getBytes(), StandardOpenOption.APPEND);
return fileName; return fileName;
} }
......
...@@ -162,15 +162,15 @@ public class ProcessService { ...@@ -162,15 +162,15 @@ public class ProcessService {
String taskType = dmpDevelopTaskDto.getTaskType();//任务类型 String taskType = dmpDevelopTaskDto.getTaskType();//任务类型
if (taskType.equals(CommConstant.TASK_TYPE_DEVSHELL)) { if (taskType.equals(CommConstant.TASK_TYPE_DEVSHELL)) {
jobType = JobType.SHELL; jobType = JobType.shell;
script = dmpDevelopTaskDto.getScript(); script = dmpDevelopTaskDto.getScript();
taskAppId = dmpDevelopTaskDto.getName(); taskAppId = dmpDevelopTaskDto.getName();
}else if (taskType.equals(CommConstant.TASK_TYPE_DEVSQL)) { }else if (taskType.equals(CommConstant.TASK_TYPE_DEVSQL)) {
jobType = JobType.SQL; jobType = JobType.sql;
script = dmpDevelopTaskDto.getScript(); script = dmpDevelopTaskDto.getScript();
taskAppId = dmpDevelopTaskDto.getName(); taskAppId = dmpDevelopTaskDto.getName();
}else if (taskType.equals(CommConstant.TASK_TYPE_OFFLINE)) { }else if (taskType.equals(CommConstant.TASK_TYPE_OFFLINE)) {
jobType = JobType.SYNC; jobType = JobType.sync;
script = dmpDevelopTaskDto.getScript(); script = dmpDevelopTaskDto.getScript();
taskAppId = dmpDevelopTaskDto.getName(); taskAppId = dmpDevelopTaskDto.getName();
}else if (taskType.equals(CommConstant.TASK_TYPE_DEVELOP)) { }else if (taskType.equals(CommConstant.TASK_TYPE_DEVELOP)) {
...@@ -190,47 +190,45 @@ public class ProcessService { ...@@ -190,47 +190,45 @@ public class ProcessService {
} }
switch (jobType) { switch (jobType) {
case SHELL: case shell:
ShellParameters shellParameters = new ShellParameters(); ShellParameters shellParameters = new ShellParameters();
shellParameters.setScript(script); shellParameters.setScript(script);
shellParameters.setTaskAppId(taskAppId); shellParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(shellParameters, projectConfigInfoDto); taskExecutionContext = new TaskExecutionContext(shellParameters, projectConfigInfoDto);
break; break;
case SQL: case sql:
SqlParameters sqlParameters = new SqlParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer); SqlParameters sqlParameters = new SqlParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer);
sqlParameters.setTaskAppId(taskAppId); sqlParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(sqlParameters, projectConfigInfoDto); taskExecutionContext = new TaskExecutionContext(sqlParameters, projectConfigInfoDto);
break; break;
case SYNC: case sync:
SyncParameters sync = new SyncParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer); SyncParameters sync = new SyncParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer);
sync.setTaskAppId(taskAppId); sync.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(sync, projectConfigInfoDto); taskExecutionContext = new TaskExecutionContext(sync, projectConfigInfoDto);
break; break;
case SUBPROCESS: case subprocess:
break; break;
case FTP: case ftp:
FtpParameters ftpParameters = new FtpParameters(); FtpParameters ftpParameters = new FtpParameters(script, freeMarkerConfigurer);
ftpParameters.setScript(script);
ftpParameters.setTaskAppId(taskAppId); ftpParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(ftpParameters, projectConfigInfoDto); taskExecutionContext = new TaskExecutionContext(ftpParameters, projectConfigInfoDto);
break; break;
case UNZIPFILE: case unzipFile:
UnzipfileParameters unzipfileParameters = new UnzipfileParameters(); UnzipfileParameters unzipfileParameters = new UnzipfileParameters(script, freeMarkerConfigurer);
unzipfileParameters.setScript(script); unzipfileParameters.setScript(script);
unzipfileParameters.setTaskAppId(taskAppId); unzipfileParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(unzipfileParameters, projectConfigInfoDto); taskExecutionContext = new TaskExecutionContext(unzipfileParameters, projectConfigInfoDto);
break; break;
case DOCTRANS: case docTrans:
DoctransParameters doctransParameters = new DoctransParameters(); DoctransParameters doctransParameters = new DoctransParameters(script, freeMarkerConfigurer);
doctransParameters.setScript(script); doctransParameters.setScript(script);
doctransParameters.setTaskAppId(taskAppId); doctransParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(doctransParameters, projectConfigInfoDto); taskExecutionContext = new TaskExecutionContext(doctransParameters, projectConfigInfoDto);
break; break;
case HDFS: case hdfs:
HdfsParameters hdfsParameters = new HdfsParameters(); HdfsParameters hdfsParameters = new HdfsParameters(script, freeMarkerConfigurer);
hdfsParameters.setScript(script); hdfsParameters.setScript(script);
hdfsParameters.setTaskAppId(taskAppId); hdfsParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(hdfsParameters, projectConfigInfoDto); taskExecutionContext = new TaskExecutionContext(hdfsParameters, projectConfigInfoDto);
......
#!/bin/bash
src_dir=${src_dir!}
des_dir=${des_dir!}
source_convert=${source_convert!}
sink_convert=${sink_convert!}
file_suffix=${file_suffix!}
# 获取 src_dir 下所有文件为 file_suffix 的文件
cd ${src_dir}
filename=`ls -l | grep ^- | awk '{print $9}' | grep .${file_suffix}`
echo ${filename}
# 获取当前系统时间
time=`date +%Y-%m-%d_%H:%M:%S`
for file in ${filename}
do
echo "正在转码的文件是: ${file}"
mkdir -p ${des_dir}/${time}
iconv -f ${source_convert} -t ${sink_convert} $src_dir/${file} -o ${des_dir}/${time}/${file}
done
\ No newline at end of file
#!/bin/bash
# 要上传到hdfs的来源目录
src_dir=${src_dir!}
# 要上传的文件类型
file_suffix="${file_suffix!}"
# 要上传到hdfs的目录
des_dir=${des_dir!}
if [ $file_suffix = "" ]; then
echo "没有后缀"
filename="*"
else
filename="*.${file_suffix}"
fi
# 判断 hdfs 文件夹是否存在
su hdfs -c "hdfs dfs -test -e ${des_dir}"
if [ $? -ne 0 ]; then
echo "====================hdfs 文件夹不存在, 自动创建======================"
su hdfs -c "hdfs dfs -mkdir -p ${des_dir}"
fi
su hdfs -c "hdfs dfs -put ${src_dir}/${filename} ${des_dir}/"
\ No newline at end of file
# sftp 服务器IP地址
ip=${ip!}
# sftp 服务器端口号
port=${port!}
# sftp 服务器用户名
username=${username!}
# sftp 服务器密码
password=${password!}
# sftp 下载源目录
src_dir=${src_dir!}
# sftp 下载目标目录
des_dir=${des_dir!}
# 被下载的文件后缀
<#if file_suffix??>
file_suffix="${file_suffix!}"
<#else>
file_suffix=""
</#if>
if [ "$file_suffix" = "" ]; then
# 没有后缀, 下载选中目录下所有文件
file_suffix="*"
fi
# 获取当前系统的时间
time=`date +%Y-%m-%d_%H:%M:%S`
des_dir=${r'${des_dir}/${time}'}
# 创建目标文件夹
mkdir -p ${r'${des_dir}'}
# 下载
lftp -u ${r'${username},${password} sftp://${ip}:${port}<<EOF'}
cd ${r'${src_dir}'}
lcd ${r'${des_dir}/'}
echo "正在下载的文件是:" ${r'${src_dir}/*.${file_suffix}'}
# 批量下载文件
mget ${r'*.${file_suffix}'}
by
EOF
\ No newline at end of file
jdbc { jdbc {
<#if driver??> <#if driver??>
# 驱动名 # 驱动名
driver = "com.mysql.jdbc.Driver" driver = "${driver!}"
</#if> </#if>
<#if url??> <#if url??>
# JDBC的url # JDBC的url
url = "jdbc:mysql://localhost:3306/info" url = "${url!}"
</#if> </#if>
<#if table??> <#if table??>
# 要加载的表名 # 要加载的表名
table = "access" table = "${table!}"
</#if> </#if>
<#if result_table_name??> <#if result_table_name??>
# spark 临时表名称 # spark 临时表名称
result_table_name = "access_log" result_table_name = "${result_table_name!}"
</#if> </#if>
<#if user??> <#if user??>
# 连接数据库的名称 # 连接数据库的名称
user = "username" user = "${user!}"
</#if> </#if>
<#if password??> <#if password??>
# 连接数据库的密码 # 连接数据库的密码
password = "password" password = "${password!}"
</#if> </#if>
<#if partitionColumn??> <#if partitionColumn??>
# 分区字段 # 分区字段
jdbc.partitionColumn = "item_id" jdbc.partitionColumn = "${partitionColumn!}"
</#if> </#if>
<#if numPartitions??> <#if numPartitions??>
# 分区个数 # 分区个数
jdbc.numPartitions = "10" jdbc.numPartitions = "${numPartitions!}"
</#if> </#if>
<#if lowerBound??> <#if lowerBound??>
# 分区上下限 # 分区上下限
jdbc.lowerBound = 0 jdbc.lowerBound = ${lowerBound!}
</#if> </#if>
<#if upperBound??> <#if upperBound??>
jdbc.upperBound = 100 jdbc.upperBound = ${upperBound!}
</#if> </#if>
} }
\ No newline at end of file
#!/bin/bash
src_dir=${src_dir!}
des_dir=${des_dir!}
type="${type!}"
cd ${src_dir}
type_zip()
{
# 获取 src_dir 下所有的 zip 文件
filenames=`ls -l | grep ^- | grep -i .zip | awk '{print $9}'`
for file in ${filenames}
do
echo "正在解压zip文件: ${file}"
mkdir -p ${des_dir}/unzip/${file}/
unzip ${file} -d ${des_dir}/unzip/${file}/
done
}
type_tar_gz()
{
# 获取 src_dir 下所有的 tar.gz 文件
filenames=`ls -l | grep ^- | grep -i .tar.gz | awk '{print $9}'`
for file in ${filenames}
do
echo "正在解压tar.gz文件: ${file}"
mkdir -p ${des_dir}/tar/${file}/
tar zxvf ${file} -C ${des_dir}/tar/${file}/
done
}
type_rar()
{
# 获取 src_dir 下所有的 rar 文件
filenames=`ls -l | grep ^- | grep -i .rar | awk '{print $9}'`
for file in ${filenames}
do
echo "正在解压rar文件: ${file}"
mkdir -p ${des_dir}/rar/${file}
unrar -x ${file} ${des_dir}/rar/${file}/
done
}
type_gzip()
{
# 获取 src_dir 下所有的 gzip 文件
filenames=`ls -l | grep ^- | grep -i .gz | grep -v .tar | awk '{print $9}'`
for file in ${filenames}
do
echo "正在解压gzip文件: ${file}"
mkdir -p ${des_dir}/gzip/${file}
length=$[${#file}-3]
gzip -cd ${file} > ${des_dir}/gzip/${file}/${file:0:length}
done
}
case $type in
'zip')
type_zip
;;
'tar')
type_tar_gz
;;
'rar')
type_rar
;;
'gzip')
type_gzip
;;
*)
echo "解压 $src_dir 目录下的所有文件"
type_zip
type_tar_gz
type_rar
type_gzip
esac
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment