Commit 25fad8a5 authored by sml's avatar sml

代码提交

parent 439de867
......@@ -49,7 +49,11 @@ public enum TaskType {
DATAX(10, "datax"),
CONDITIONS(11, "conditions"),
SQOOP(12, "sqoop"),
WATERDROP(13, "waterdrop");
WATERDROP(13, "waterdrop"),
FTP(14, "ftp"),
UNZIPFILE(15, "unzipfile"),
DOCTRANS(16, "docTrans"),
HDFS(17, "hdfs");
TaskType(int code, String descp){
this.code = code;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.docTrans;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.List;
/**
* Sql/Hql parameter
*/
public class DoctransParameters extends AbstractParameters {
/**
* shell script
*/
private String script;
private String taskAppId;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public DoctransParameters() {
}
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
@Override
public boolean checkParameters() {
return script != null && !script.isEmpty();
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.ftp;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.List;
/**
* Sql/Hql parameter
*/
public class FtpParameters extends AbstractParameters {
/**
* shell script
*/
private String script;
private String taskAppId;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public FtpParameters() {
}
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
@Override
public boolean checkParameters() {
return script != null && !script.isEmpty();
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.hdfs;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.List;
/**
* Sql/Hql parameter
*/
public class HdfsParameters extends AbstractParameters {
/**
* shell script
*/
private String script;
private String taskAppId;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public HdfsParameters() {
}
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
@Override
public boolean checkParameters() {
return script != null && !script.isEmpty();
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import java.util.List;
/**
* Sql/Hql parameter
*/
public class UnzipfileParameters extends AbstractParameters {
/**
* shell script
*/
private String script;
private String taskAppId;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public UnzipfileParameters() {
}
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
@Override
public boolean checkParameters() {
return script != null && !script.isEmpty();
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
}
......@@ -62,6 +62,14 @@ public class EnumUtils {
return TaskType.SHELL;
}else if (taskType.equals(CommConstant.WORK_TYPE_SQL)) {
return TaskType.SQL;
}else if (taskType.equals(CommConstant.WORK_TYPE_FTP)) {
return TaskType.FTP;
}else if (taskType.equals(CommConstant.WORK_TYPE_UNZIPFILE)) {
return TaskType.UNZIPFILE;
}else if (taskType.equals(CommConstant.WORK_TYPE_DOCTRANS)) {
return TaskType.DOCTRANS;
}else if (taskType.equals(CommConstant.WORK_TYPE_HDFS)) {
return TaskType.HDFS;
}
return null;
}
......
......@@ -21,8 +21,12 @@ import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.docTrans.DoctransParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.hdfs.HdfsParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.shell.ShellParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile.UnzipfileParameters;
import com.jz.dmp.cmdexectool.scheduler.remote.command.Command;
import com.jz.dmp.cmdexectool.scheduler.remote.command.TaskExecuteRequestCommand;
import com.jz.dmp.cmdexectool.scheduler.remote.utils.JsonSerializer;
......@@ -248,7 +252,39 @@ public class TaskExecutionContext implements Serializable {
this.setParameters(sqlParameters);
}
public int getTaskInstanceId() {
public TaskExecutionContext(FtpParameters ftpParameters, DmpProjectConfigInfoDto configInfoDto) {
this.setTaskType(CommConstant.WORK_TYPE_FTP);
this.setExecutePath(configInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorShellPath());
this.setTaskParams(JSONObject.toJSONString(ftpParameters));
this.setTaskAppId(ftpParameters.getTaskAppId());
this.setParameters(ftpParameters);
}
public TaskExecutionContext(UnzipfileParameters unzipfileParameters, DmpProjectConfigInfoDto configInfoDto) {
this.setTaskType(CommConstant.WORK_TYPE_UNZIPFILE);
this.setExecutePath(configInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorShellPath());
this.setTaskParams(JSONObject.toJSONString(unzipfileParameters));
this.setTaskAppId(unzipfileParameters.getTaskAppId());
this.setParameters(unzipfileParameters);
}
public TaskExecutionContext(DoctransParameters doctransParameters, DmpProjectConfigInfoDto configInfoDto) {
this.setTaskType(CommConstant.WORK_TYPE_DOCTRANS);
this.setExecutePath(configInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorShellPath());
this.setTaskParams(JSONObject.toJSONString(doctransParameters));
this.setTaskAppId(doctransParameters.getTaskAppId());
this.setParameters(doctransParameters);
}
public TaskExecutionContext(HdfsParameters hdfsParameters, DmpProjectConfigInfoDto configInfoDto) {
this.setTaskType(CommConstant.WORK_TYPE_HDFS);
this.setExecutePath(configInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorShellPath());
this.setTaskParams(JSONObject.toJSONString(hdfsParameters));
this.setTaskAppId(hdfsParameters.getTaskAppId());
this.setParameters(hdfsParameters);
}
public int getTaskInstanceId() {
return taskInstanceId;
}
......
......@@ -17,23 +17,18 @@
package com.jz.dmp.cmdexectool.scheduler.server.worker.task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.jz.dmp.cmdexectool.scheduler.common.enums.TaskType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.EnumUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.datax.DataxTask;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.flink.FlinkTask;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.http.HttpTask;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.mr.MapReduceTask;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.processdure.ProcedureTask;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.python.PythonTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.doctrans.DoctransTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ftp.FtpTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.hdfs.HdfsTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.shell.ShellTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.sql.SqlTask;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.spark.SparkTask;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.sql.SqlTask;
//import com.jz.dmp.cmdexectool.scheduler.server.worker.task.sqoop.SqoopTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.unzipfile.UnzipfileTask;
/**
* task manaster
......@@ -57,26 +52,20 @@ public class TaskManager {
}
switch (anEnum) {
case SHELL:
case WATERDROP:
return new ShellTask(taskExecutionContext, logger);
//case PROCEDURE:
// return new ProcedureTask(taskExecutionContext, logger);
case SQL:
return new SqlTask(taskExecutionContext, logger);
//case MR:
// return new MapReduceTask(taskExecutionContext, logger);
//case SPARK:
// return new SparkTask(taskExecutionContext, logger);
//case FLINK:
// return new FlinkTask(taskExecutionContext, logger);
//case PYTHON:
// return new PythonTask(taskExecutionContext, logger);
//case HTTP:
// return new HttpTask(taskExecutionContext, logger);
//case DATAX:
// return new DataxTask(taskExecutionContext, logger);
//case SQOOP:
// return new SqoopTask(taskExecutionContext, logger);
case FTP:
return new FtpTask(taskExecutionContext, logger);
case UNZIPFILE:
return new UnzipfileTask(taskExecutionContext, logger);
case DOCTRANS:
return new DoctransTask(taskExecutionContext, logger);
case HDFS:
return new HdfsTask(taskExecutionContext, logger);
default:
logger.error("not support task type: {}", taskExecutionContext.getTaskType());
throw new IllegalArgumentException("not support task type");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.server.worker.task.doctrans;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import org.slf4j.Logger;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.docTrans.DoctransParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/**
* sql task
*/
public class DoctransTask extends AbstractTask {
/**
* sql parameters
*/
private DoctransParameters doctransParameters;
/**
* waterdrop command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public DoctransTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
public void init() {
logger.info("sql task params {}", taskExecutionContext.getTaskParams());
//sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
doctransParameters = (DoctransParameters) taskExecutionContext.getParameters();
if (!doctransParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
}
@Override
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("sql task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
logger.info("raw script : {}", doctransParameters.getScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (OSUtils.isWindows()) {
// Files.createFile(path);
File file = path.toFile();
File parentFile = file.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdirs();
}
file.createNewFile();
} else {
Files.createFile(path, attr);
}
Files.write(path, doctransParameters.getScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
@Override
public AbstractParameters getParameters() {
return doctransParameters;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.server.worker.task.ftp;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import org.slf4j.Logger;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/**
* sql task
*/
public class FtpTask extends AbstractTask {
/**
* sql parameters
*/
private FtpParameters ftpParameters;
/**
* waterdrop command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public FtpTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
public void init() {
logger.info("sql task params {}", taskExecutionContext.getTaskParams());
//sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
ftpParameters = (FtpParameters) taskExecutionContext.getParameters();
if (!ftpParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
}
@Override
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("sql task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
logger.info("raw script : {}", ftpParameters.getScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (OSUtils.isWindows()) {
// Files.createFile(path);
File file = path.toFile();
File parentFile = file.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdirs();
}
file.createNewFile();
} else {
Files.createFile(path, attr);
}
Files.write(path, ftpParameters.getScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
@Override
public AbstractParameters getParameters() {
return ftpParameters;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.server.worker.task.hdfs;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import org.slf4j.Logger;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.hdfs.HdfsParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/**
* sql task
*/
public class HdfsTask extends AbstractTask {
/**
* sql parameters
*/
private HdfsParameters hdfsParameters;
/**
* waterdrop command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public HdfsTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
public void init() {
logger.info("sql task params {}", taskExecutionContext.getTaskParams());
//sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
hdfsParameters = (HdfsParameters) taskExecutionContext.getParameters();
if (!hdfsParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
}
@Override
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("sql task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
logger.info("raw script : {}", hdfsParameters.getScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (OSUtils.isWindows()) {
// Files.createFile(path);
File file = path.toFile();
File parentFile = file.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdirs();
}
file.createNewFile();
} else {
Files.createFile(path, attr);
}
Files.write(path, hdfsParameters.getScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
@Override
public AbstractParameters getParameters() {
return hdfsParameters;
}
}
......@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.server.worker.task.unzipfile;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
import org.slf4j.Logger;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile.UnzipfileParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/**
* sql task
*/
public class UnzipfileTask extends AbstractTask {
/**
* sql parameters
*/
private UnzipfileParameters unzipfileParameters;
/**
* waterdrop command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public UnzipfileTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
public void init() {
logger.info("sql task params {}", taskExecutionContext.getTaskParams());
//sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
unzipfileParameters = (UnzipfileParameters) taskExecutionContext.getParameters();
if (!unzipfileParameters.checkParameters()) {
throw new RuntimeException("unzipfile task params is not valid");
}
}
@Override
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("sql task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
logger.info("raw script : {}", unzipfileParameters.getScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (OSUtils.isWindows()) {
// Files.createFile(path);
File file = path.toFile();
File parentFile = file.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdirs();
}
file.createNewFile();
} else {
Files.createFile(path, attr);
}
Files.write(path, unzipfileParameters.getScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
@Override
public AbstractParameters getParameters() {
return unzipfileParameters;
}
}
......@@ -34,7 +34,6 @@ import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.controller.bean.DmpDevelopTaskDto;
import com.jz.dmp.cmdexectool.controller.bean.DmpDevelopTaskHistoryDto;
import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.cmdexectool.controller.bean.MyDmpDevelopTaskConverter;
import com.jz.dmp.cmdexectool.controller.bean.MyDmpDevelopTaskHistoryConverter;
import com.jz.dmp.cmdexectool.mapper.DmpDevelopTaskHistoryMapper;
import com.jz.dmp.cmdexectool.mapper.DmpDevelopTaskMapper;
......@@ -43,8 +42,12 @@ import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus;
import com.jz.dmp.cmdexectool.scheduler.common.enums.JobType;
import com.jz.dmp.cmdexectool.scheduler.common.task.docTrans.DoctransParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.hdfs.HdfsParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.shell.ShellParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile.UnzipfileParameters;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.TaskManager;
......@@ -152,9 +155,6 @@ public class ProcessService {
DmpProjectConfigInfoDto projectConfigInfoDto = dtos.get(0);
//测试用替换配置代码
//projectConfigInfoDto.getDmpPublicConfigInfoDto().setAzkabanExectorShellPath("E:/test");
JobType jobType = null;
String script = "";
String taskAppId = "";
......@@ -209,16 +209,28 @@ public class ProcessService {
break;
case FTP:
FtpParameters ftpParameters = new FtpParameters();
ftpParameters.setScript(script);
ftpParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(ftpParameters, projectConfigInfoDto);
break;
case UNZIPFILE:
UnzipfileParameters unzipfileParameters = new UnzipfileParameters();
unzipfileParameters.setScript(script);
unzipfileParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(unzipfileParameters, projectConfigInfoDto);
break;
case DOCTRANS:
DoctransParameters doctransParameters = new DoctransParameters();
doctransParameters.setScript(script);
doctransParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(doctransParameters, projectConfigInfoDto);
break;
case HDFS:
HdfsParameters hdfsParameters = new HdfsParameters();
hdfsParameters.setScript(script);
hdfsParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(hdfsParameters, projectConfigInfoDto);
break;
default:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment