Commit a8b57ada authored by sml's avatar sml

Merge branch 'master' of

http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-cmdexectool.git

Conflicts:
	src/main/java/com/jz/dmp/cmdexectool/scheduler/service/process/ProcessService.java
parents 7b6cc6a6 e36dc75c
......@@ -17,7 +17,7 @@
<java.version>1.8</java.version>
<shiro.version>1.2.3</shiro.version>
<mybatis-pagehelper.version>4.2.0</mybatis-pagehelper.version>
<start-class>com.jz.Application</start-class>
<start-class>com.jz.dmp.cmdexectool.ApiApplication</start-class>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
......@@ -94,11 +94,6 @@
</exclusions>
</dependency>
<!-- swagger -->
<!--<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.4.0</version>
</dependency>-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
......
......@@ -2,14 +2,13 @@ package com.jz.dmp.cmdexectool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import com.jz.dmp.cmdexectool.common.utils.ApplicationContextUtil;
import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
......@@ -23,7 +22,7 @@ public class ApiApplication implements HealthIndicator {
public static void main(String[] args) {
long start = System.currentTimeMillis();
SpringApplication springApplication = new SpringApplication(ApiApplication.class);
springApplication.run(args);
ConfigurableApplicationContext context = springApplication.run(args);
Integer taskId = Integer.parseInt(args[0]);
if (taskId==null) {
......@@ -40,6 +39,9 @@ public class ApiApplication implements HealthIndicator {
long cost = System.currentTimeMillis() - start;
logger.info(" started status: {}, cost: {}", "SUCCESS!", cost);
springApplication.exit(context);
}
@Override
......
......@@ -57,5 +57,11 @@ public class CommConstant {
public static final String WATERDROP_FTL_SINK_HDFS = "sink_hdfs.ftl";
public static final String WATERDROP_FTL_SINK_JDBC = "sink_jdbc.ftl";
public static final String WATERDROP_FTL_SINK_KAFKA = "sink_kafka.ftl";
//其他script模板
public static final String FTL_SFTP_DOWNLOAD = "sftp_download.ftl";//ftp下载
public static final String FTL_UNZIPFILE = "unzipfile.ftl";//文件解压
public static final String FTL_DOCTRANS = "doctrans.ftl";//文件转码
public static final String FTL_HDFS_UPLOAD = "hdfs_upload.ftl";//HDFS上传
}
......@@ -2,16 +2,16 @@ package com.jz.dmp.cmdexectool.scheduler.common.enums;
public enum JobType {
START("start","开始任务"),
SHELL("shell","shell任务"),
SQL("sql","sql任务"),
SYNC("sync","离线任务"),
SUBPROCESS("subprocess","子流程任务"),
FTP("ftp","ftp下载任务"),
UNZIPFILE("unzipFile","解压文件任务"),
DOCTRANS("docTrans","文件转码任务"),
HDFS("hdfs","hdfs上传任务"),
STOP("stop","停止任务");
start("start","开始任务"),
shell("shell","shell任务"),
sql("sql","sql任务"),
sync("sync","离线任务"),
subprocess("subprocess","子流程任务"),
ftp("ftp","ftp下载任务"),
unzipFile("unzipFile","解压文件任务"),
docTrans("docTrans","文件转码任务"),
hdfs("hdfs","hdfs上传任务"),
stop("stop","停止任务");
//job类型
private String jobTypeStr;
......
......@@ -16,9 +16,17 @@
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.docTrans;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/**
* Sql/Hql parameter
......@@ -36,10 +44,39 @@ public class DoctransParameters extends AbstractParameters {
* resource list
*/
private List<ResourceInfo> resourceList;
/**
* command需要执行的脚本
*/
private String cmdScript;
public DoctransParameters() {
}
public DoctransParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, String> doctransModel = new HashMap<String, String>();
String srcDir = scriptObj.getString("ftpUrl");
String[] srcDirSuffix = srcDir.split(".");
String fileSuffix = srcDirSuffix[1];
String desDir = scriptObj.getString("ftpUrl");
String sourceConvert = scriptObj.getString("ftpUrl");
String sinkConvert = scriptObj.getString("ftpUrl");
doctransModel.put("src_dir", srcDir);
doctransModel.put("des_dir", desDir);
doctransModel.put("source_convert", sourceConvert);
doctransModel.put("sink_convert", sinkConvert);
doctransModel.put("file_suffix", fileSuffix);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_DOCTRANS, doctransModel, freeMarkerConfig);
}
public String getScript() {
return script;
......@@ -65,9 +102,17 @@ public class DoctransParameters extends AbstractParameters {
this.resourceList = resourceList;
}
public String getCmdScript() {
return cmdScript;
}
public void setCmdScript(String cmdScript) {
this.cmdScript = cmdScript;
}
@Override
public boolean checkParameters() {
return script != null && !script.isEmpty();
return cmdScript != null && !cmdScript.isEmpty();
}
@Override
......
......@@ -16,15 +16,27 @@
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.ftp;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.List;
/**
* Sql/Hql parameter
*/
public class FtpParameters extends AbstractParameters {
private static Logger logger = LoggerFactory.getLogger(FtpParameters.class);
/**
* shell script
*/
......@@ -36,11 +48,56 @@ public class FtpParameters extends AbstractParameters {
* resource list
*/
private List<ResourceInfo> resourceList;
/**
* command需要执行的脚本
*/
private String cmdScript;
public FtpParameters() {
}
public FtpParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, String> ftpModel = new HashMap<String, String>();
String ftpUrl = scriptObj.getString("ftpUrl");
String[] ftpUrlPort = ftpUrl.split(":");
String ip = ftpUrlPort[0];
String port = "22";
if (ftpUrlPort.length>1) {
port = ftpUrlPort[1];
}
String username = scriptObj.getString("ftpUsername");
String password = scriptObj.getString("ftpPassword");
String srcDir = scriptObj.getString("ftpSourceFileDir");
String ftpSaveDestDir = scriptObj.getString("ftpSaveDestDir");
String[] desDirFileSuffix = ftpSaveDestDir.split("\\.");
String desDir = desDirFileSuffix[0];
String fileSuffix = "";
if (desDirFileSuffix.length>1) {
fileSuffix = desDirFileSuffix[1];
}
ftpModel.put("ip", ip);
ftpModel.put("port", port);
ftpModel.put("username", username);
ftpModel.put("password", password);
ftpModel.put("src_dir", srcDir);
ftpModel.put("des_dir", desDir);
ftpModel.put("file_suffix", fileSuffix);
logger.info("调用ftp command模板参数【{}】", JSONObject.toJSONString(ftpModel));
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_SFTP_DOWNLOAD, ftpModel, freeMarkerConfig);
}
public String getScript() {
return script;
}
......@@ -65,9 +122,17 @@ public class FtpParameters extends AbstractParameters {
this.resourceList = resourceList;
}
public String getCmdScript() {
return cmdScript;
}
public void setCmdScript(String cmdScript) {
this.cmdScript = cmdScript;
}
@Override
public boolean checkParameters() {
return script != null && !script.isEmpty();
return cmdScript != null && !cmdScript.isEmpty();
}
@Override
......
......@@ -16,9 +16,17 @@
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.hdfs;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/**
* Sql/Hql parameter
......@@ -36,10 +44,33 @@ public class HdfsParameters extends AbstractParameters {
* resource list
*/
private List<ResourceInfo> resourceList;
/**
* command需要执行的脚本
*/
private String cmdScript;
public HdfsParameters() {
}
public HdfsParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, String> HdfsModel = new HashMap<String, String>();
String srcDir = scriptObj.getString("localUploadFileDir");
String fileSuffix = scriptObj.getString("hdfsUploadFileFilter");
String desDir = scriptObj.getString("hdfsUploadSaveDir");
HdfsModel.put("src_dir", srcDir);
HdfsModel.put("file_suffix", fileSuffix);
HdfsModel.put("des_dir", desDir);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_HDFS_UPLOAD, HdfsModel, freeMarkerConfig);
}
public String getScript() {
return script;
......@@ -65,9 +96,17 @@ public class HdfsParameters extends AbstractParameters {
this.resourceList = resourceList;
}
public String getCmdScript() {
return cmdScript;
}
public void setCmdScript(String cmdScript) {
this.cmdScript = cmdScript;
}
@Override
public boolean checkParameters() {
return script != null && !script.isEmpty();
return cmdScript != null && !cmdScript.isEmpty();
}
@Override
......
package com.jz.dmp.cmdexectool.scheduler.common.task.sync;
import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import java.util.List;
/**
* @ClassName: SyncParameters
* @Description: sync parameters
* @Author : Bellamy
* @Date 2021/2/25
* @Version 1.0
*/
public class SyncParameters extends AbstractParameters {
/**
* shell script
*/
private String script;
private String taskAppId;
/**
* 数据源相关配置
*/
private String source;
/**
* 目标相关配置
*/
private String sink;
/**
* 环境相关配置
*/
private String env;
/**
* ETL相关配置
*/
private String transform;
/**
* waterdropScript脚本
*/
private String waterdropScript;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) {
}
@Override
public boolean checkParameters() {
return waterdropScript != null && !waterdropScript.isEmpty();
}
/**
* get project resource files list
*
* @return resource files list
*/
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getSink() {
return sink;
}
public void setSink(String sink) {
this.sink = sink;
}
public String getEnv() {
return env;
}
public void setEnv(String env) {
this.env = env;
}
public String getTransform() {
return transform;
}
public void setTransform(String transform) {
this.transform = transform;
}
public String getWaterdropScript() {
return waterdropScript;
}
public void setWaterdropScript(String waterdropScript) {
this.waterdropScript = waterdropScript;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
}
......@@ -16,9 +16,17 @@
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/**
* Sql/Hql parameter
......@@ -36,10 +44,34 @@ public class UnzipfileParameters extends AbstractParameters {
* resource list
*/
private List<ResourceInfo> resourceList;
/**
* command需要执行的脚本
*/
private String cmdScript;
public UnzipfileParameters() {
}
public UnzipfileParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, String> unzipfileModel = new HashMap<String, String>();
String srcDir = scriptObj.getString("compressedFileDir");
String desDir = scriptObj.getString("zipOutputDir");
String type = scriptObj.getString("decompressFormat");
unzipfileModel.put("src_dir", srcDir);
unzipfileModel.put("des_dir", desDir);
unzipfileModel.put("type", type);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_UNZIPFILE, unzipfileModel, freeMarkerConfig);
}
public String getScript() {
return script;
......@@ -65,9 +97,17 @@ public class UnzipfileParameters extends AbstractParameters {
this.resourceList = resourceList;
}
public String getCmdScript() {
return cmdScript;
}
public void setCmdScript(String cmdScript) {
this.cmdScript = cmdScript;
}
@Override
public boolean checkParameters() {
return script != null && !script.isEmpty();
return cmdScript != null && !cmdScript.isEmpty();
}
@Override
......
......@@ -26,6 +26,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.hdfs.HdfsParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.shell.ShellParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile.UnzipfileParameters;
import com.jz.dmp.cmdexectool.scheduler.remote.command.Command;
import com.jz.dmp.cmdexectool.scheduler.remote.command.TaskExecuteRequestCommand;
......@@ -284,6 +285,14 @@ public class TaskExecutionContext implements Serializable {
this.setParameters(hdfsParameters);
}
public TaskExecutionContext(SyncParameters syncParameters, DmpProjectConfigInfoDto configInfoDto) {
this.setTaskType(CommConstant.WORK_TYPE_SYNC);
this.setExecutePath(configInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorShellPath());
this.setTaskParams(JSONObject.toJSONString(syncParameters));
this.setTaskAppId(syncParameters.getTaskAppId());
this.setParameters(syncParameters);
}
public int getTaskInstanceId() {
return taskInstanceId;
}
......
......@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/**
......@@ -47,9 +48,9 @@ public class DoctransTask extends AbstractTask {
private DoctransParameters doctransParameters;
/**
* waterdrop command executor
* shell command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
......@@ -66,7 +67,7 @@ public class DoctransTask extends AbstractTask {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
......@@ -85,7 +86,7 @@ public class DoctransTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
......@@ -99,7 +100,7 @@ public class DoctransTask extends AbstractTask {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
shellCommandExecutor.cancelApplication();
}
/**
......@@ -137,7 +138,7 @@ public class DoctransTask extends AbstractTask {
Files.createFile(path, attr);
}
Files.write(path, doctransParameters.getScript().getBytes(), StandardOpenOption.APPEND);
Files.write(path, doctransParameters.getCmdScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
......
......@@ -24,9 +24,7 @@ import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import org.slf4j.Logger;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters;
......@@ -34,7 +32,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor;
/**
* sql task
......@@ -49,7 +47,7 @@ public class FtpTask extends AbstractTask {
/**
* waterdrop command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
......@@ -66,7 +64,7 @@ public class FtpTask extends AbstractTask {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
......@@ -85,7 +83,7 @@ public class FtpTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
......@@ -99,7 +97,7 @@ public class FtpTask extends AbstractTask {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
shellCommandExecutor.cancelApplication();
}
/**
......@@ -137,7 +135,7 @@ public class FtpTask extends AbstractTask {
Files.createFile(path, attr);
}
Files.write(path, ftpParameters.getScript().getBytes(), StandardOpenOption.APPEND);
Files.write(path, ftpParameters.getCmdScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
......
......@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/**
......@@ -47,9 +48,9 @@ public class HdfsTask extends AbstractTask {
private HdfsParameters hdfsParameters;
/**
* waterdrop command executor
* shell command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
......@@ -66,7 +67,7 @@ public class HdfsTask extends AbstractTask {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
......@@ -85,7 +86,7 @@ public class HdfsTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
......@@ -99,7 +100,7 @@ public class HdfsTask extends AbstractTask {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
shellCommandExecutor.cancelApplication();
}
/**
......@@ -137,7 +138,7 @@ public class HdfsTask extends AbstractTask {
Files.createFile(path, attr);
}
Files.write(path, hdfsParameters.getScript().getBytes(), StandardOpenOption.APPEND);
Files.write(path, hdfsParameters.getCmdScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.server.worker.task.sync;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
import org.slf4j.Logger;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
/**
* SyncTask
*/
public class SyncTask extends AbstractTask {
/**
* sql parameters
*/
private SyncParameters syncParameters;
/**
* waterdrop command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public SyncTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
public void init() {
logger.info("sync task params {}", taskExecutionContext.getTaskParams());
//sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
syncParameters = (SyncParameters) taskExecutionContext.getParameters();
if (!syncParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
}
@Override
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("sync task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
logger.info("raw script : {}", syncParameters.getScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (OSUtils.isWindows()) {
// Files.createFile(path);
File file = path.toFile();
File parentFile = file.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdirs();
}
file.createNewFile();
} else {
Files.createFile(path, attr);
}
Files.write(path, syncParameters.getWaterdropScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
@Override
public AbstractParameters getParameters() {
return syncParameters;
}
}
......@@ -34,6 +34,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/**
......@@ -47,9 +48,9 @@ public class UnzipfileTask extends AbstractTask {
private UnzipfileParameters unzipfileParameters;
/**
* waterdrop command executor
* shell command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
private ShellCommandExecutor shellCommandExecutor;
/**
* taskExecutionContext
......@@ -66,7 +67,7 @@ public class UnzipfileTask extends AbstractTask {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
......@@ -85,7 +86,7 @@ public class UnzipfileTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
......@@ -99,7 +100,7 @@ public class UnzipfileTask extends AbstractTask {
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
shellCommandExecutor.cancelApplication();
}
/**
......@@ -137,7 +138,7 @@ public class UnzipfileTask extends AbstractTask {
Files.createFile(path, attr);
}
Files.write(path, unzipfileParameters.getScript().getBytes(), StandardOpenOption.APPEND);
Files.write(path, unzipfileParameters.getCmdScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
......
......@@ -209,7 +209,6 @@ public class ProcessService {
taskExecutionContext = new TaskExecutionContext(sync, projectConfigInfoDto);
break;
case subprocess:
break;
case ftp:
FtpParameters ftpParameters = new FtpParameters(script, freeMarkerConfigurer);
......
#!/bin/bash
src_dir=${src_dir!}
des_dir=${des_dir!}
source_convert=${source_convert!}
sink_convert=${sink_convert!}
file_suffix=${file_suffix!}
# 获取 src_dir 下所有文件为 file_suffix 的文件
cd ${src_dir}
filename=`ls -l | grep ^- | awk '{print $9}' | grep .${file_suffix}`
echo ${filename}
# 获取当前系统时间
time=`date +%Y-%m-%d_%H:%M:%S`
for file in ${filename}
do
echo "正在转码的文件是: ${file}"
mkdir -p ${des_dir}/${time}
iconv -f ${source_convert} -t ${sink_convert} $src_dir/${file} -o ${des_dir}/${time}/${file}
done
\ No newline at end of file
#!/bin/bash
# 要上传到hdfs的来源目录
src_dir=${src_dir!}
# 要上传的文件类型
file_suffix="${file_suffix!}"
# 要上传到hdfs的目录
des_dir=${des_dir!}
if [ $file_suffix = "" ]; then
echo "没有后缀"
filename="*"
else
filename="*.${file_suffix}"
fi
# 判断 hdfs 文件夹是否存在
su hdfs -c "hdfs dfs -test -e ${des_dir}"
if [ $? -ne 0 ]; then
echo "====================hdfs 文件夹不存在, 自动创建======================"
su hdfs -c "hdfs dfs -mkdir -p ${des_dir}"
fi
su hdfs -c "hdfs dfs -put ${src_dir}/${filename} ${des_dir}/"
\ No newline at end of file
# sftp 服务器IP地址
ip=${ip!}
# sftp 服务器端口号
port=${port!}
# sftp 服务器用户名
username=${username!}
# sftp 服务器密码
password=${password!}
# sftp 下载源目录
src_dir=${src_dir!}
# sftp 下载目标目录
des_dir=${des_dir!}
# 被下载的文件后缀
<#if file_suffix??>
file_suffix="${file_suffix!}"
<#else>
file_suffix=""
</#if>
if [ "$file_suffix" = "" ]; then
# 没有后缀, 下载选中目录下所有文件
file_suffix="*"
fi
# 获取当前系统的时间
time=`date +%Y-%m-%d_%H:%M:%S`
des_dir=${r'${des_dir}/${time}'}
# 创建目标文件夹
mkdir -p ${r'${des_dir}'}
# 下载
lftp -u ${r'${username},${password} sftp://${ip}:${port}<<EOF'}
cd ${r'${src_dir}'}
lcd ${r'${des_dir}/'}
echo "正在下载的文件是:" ${r'${src_dir}/*.${file_suffix}'}
# 批量下载文件
mget ${r'*.${file_suffix}'}
by
EOF
\ No newline at end of file
jdbc {
<#if driver??>
# 驱动名
driver = "com.mysql.jdbc.Driver"
driver = "${driver!}"
</#if>
<#if url??>
# JDBC的url
url = "jdbc:mysql://localhost:3306/info"
url = "${url!}"
</#if>
<#if table??>
# 要加载的表名
table = "access"
table = "${table!}"
</#if>
<#if result_table_name??>
# spark 临时表名称
result_table_name = "access_log"
result_table_name = "${result_table_name!}"
</#if>
<#if user??>
# 连接数据库的名称
user = "username"
user = "${user!}"
</#if>
<#if password??>
# 连接数据库的密码
password = "password"
password = "${password!}"
</#if>
<#if partitionColumn??>
# 分区字段
jdbc.partitionColumn = "item_id"
jdbc.partitionColumn = "${partitionColumn!}"
</#if>
<#if numPartitions??>
# 分区个数
jdbc.numPartitions = "10"
jdbc.numPartitions = "${numPartitions!}"
</#if>
<#if lowerBound??>
# 分区上下限
jdbc.lowerBound = 0
jdbc.lowerBound = ${lowerBound!}
</#if>
<#if upperBound??>
jdbc.upperBound = 100
jdbc.upperBound = ${upperBound!}
</#if>
}
\ No newline at end of file
#!/bin/bash
src_dir=${src_dir!}
des_dir=${des_dir!}
type="${type!}"
cd ${src_dir}
type_zip()
{
# 获取 src_dir 下所有的 zip 文件
filenames=`ls -l | grep ^- | grep -i .zip | awk '{print $9}'`
for file in ${filenames}
do
echo "正在解压zip文件: ${file}"
mkdir -p ${des_dir}/unzip/${file}/
unzip ${file} -d ${des_dir}/unzip/${file}/
done
}
type_tar_gz()
{
# 获取 src_dir 下所有的 tar.gz 文件
filenames=`ls -l | grep ^- | grep -i .tar.gz | awk '{print $9}'`
for file in ${filenames}
do
echo "正在解压tar.gz文件: ${file}"
mkdir -p ${des_dir}/tar/${file}/
tar zxvf ${file} -C ${des_dir}/tar/${file}/
done
}
type_rar()
{
# 获取 src_dir 下所有的 rar 文件
filenames=`ls -l | grep ^- | grep -i .rar | awk '{print $9}'`
for file in ${filenames}
do
echo "正在解压rar文件: ${file}"
mkdir -p ${des_dir}/rar/${file}
unrar -x ${file} ${des_dir}/rar/${file}/
done
}
type_gzip()
{
# 获取 src_dir 下所有的 gzip 文件
filenames=`ls -l | grep ^- | grep -i .gz | grep -v .tar | awk '{print $9}'`
for file in ${filenames}
do
echo "正在解压gzip文件: ${file}"
mkdir -p ${des_dir}/gzip/${file}
length=$[${#file}-3]
gzip -cd ${file} > ${des_dir}/gzip/${file}/${file:0:length}
done
}
case $type in
'zip')
type_zip
;;
'tar')
type_tar_gz
;;
'rar')
type_rar
;;
'gzip')
type_gzip
;;
*)
echo "解压 $src_dir 目录下的所有文件"
type_zip
type_tar_gz
type_rar
type_gzip
esac
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment