sync commit

parent 25fad8a5
package com.jz.dmp.cmdexectool.scheduler.common.task.sync;
import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import java.util.List;
/**
* @ClassName: SyncParameters
* @Description: sync parameters
* @Author : Bellamy
* @Date 2021/2/25
* @Version 1.0
*/
public class SyncParameters extends AbstractParameters {
/**
* shell script
*/
private String script;
private String taskAppId;
/**
* 数据源相关配置
*/
private String source;
/**
* 目标相关配置
*/
private String sink;
/**
* 环境相关配置
*/
private String env;
/**
* ETL相关配置
*/
private String transform;
/**
* waterdropScript脚本
*/
private String waterdropScript;
/**
* resource list
*/
private List<ResourceInfo> resourceList;
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) {
}
@Override
public boolean checkParameters() {
return waterdropScript != null && !waterdropScript.isEmpty();
}
/**
* get project resource files list
*
* @return resource files list
*/
@Override
public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}
public String getScript() {
return script;
}
public void setScript(String script) {
this.script = script;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public String getSource() {
return source;
}
public void setSource(String source) {
this.source = source;
}
public String getSink() {
return sink;
}
public void setSink(String sink) {
this.sink = sink;
}
public String getEnv() {
return env;
}
public void setEnv(String env) {
this.env = env;
}
public String getTransform() {
return transform;
}
public void setTransform(String transform) {
this.transform = transform;
}
public String getWaterdropScript() {
return waterdropScript;
}
public void setWaterdropScript(String waterdropScript) {
this.waterdropScript = waterdropScript;
}
public List<ResourceInfo> getResourceList() {
return resourceList;
}
public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}
}
...@@ -26,6 +26,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters; ...@@ -26,6 +26,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.hdfs.HdfsParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.hdfs.HdfsParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.shell.ShellParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.shell.ShellParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile.UnzipfileParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile.UnzipfileParameters;
import com.jz.dmp.cmdexectool.scheduler.remote.command.Command; import com.jz.dmp.cmdexectool.scheduler.remote.command.Command;
import com.jz.dmp.cmdexectool.scheduler.remote.command.TaskExecuteRequestCommand; import com.jz.dmp.cmdexectool.scheduler.remote.command.TaskExecuteRequestCommand;
...@@ -284,6 +285,14 @@ public class TaskExecutionContext implements Serializable { ...@@ -284,6 +285,14 @@ public class TaskExecutionContext implements Serializable {
this.setParameters(hdfsParameters); this.setParameters(hdfsParameters);
} }
public TaskExecutionContext(SyncParameters syncParameters, DmpProjectConfigInfoDto configInfoDto) {
this.setTaskType(CommConstant.WORK_TYPE_SYNC);
this.setExecutePath(configInfoDto.getDmpPublicConfigInfoDto().getAzkabanExectorShellPath());
this.setTaskParams(JSONObject.toJSONString(syncParameters));
this.setTaskAppId(syncParameters.getTaskAppId());
this.setParameters(syncParameters);
}
public int getTaskInstanceId() { public int getTaskInstanceId() {
return taskInstanceId; return taskInstanceId;
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.server.worker.task.sync;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
import org.slf4j.Logger;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.Set;
/**
* SyncTask
*/
public class SyncTask extends AbstractTask {
/**
* sql parameters
*/
private SyncParameters syncParameters;
/**
* waterdrop command executor
*/
private WaterdropCommandExecutor waterdropCommandExecutor;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public SyncTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
}
@Override
public void init() {
logger.info("sync task params {}", taskExecutionContext.getTaskParams());
//sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
syncParameters = (SyncParameters) taskExecutionContext.getParameters();
if (!syncParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
}
@Override
public void handle() throws Exception {
try {
// construct process
CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
logger.error("sync task error", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e;
}
}
@Override
public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process
waterdropCommandExecutor.cancelApplication();
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
logger.info("raw script : {}", syncParameters.getScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (OSUtils.isWindows()) {
// Files.createFile(path);
File file = path.toFile();
File parentFile = file.getParentFile();
if (!parentFile.exists()) {
parentFile.mkdirs();
}
file.createNewFile();
} else {
Files.createFile(path, attr);
}
Files.write(path, syncParameters.getWaterdropScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
@Override
public AbstractParameters getParameters() {
return syncParameters;
}
}
...@@ -21,6 +21,7 @@ import java.util.HashMap; ...@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -203,7 +204,9 @@ public class ProcessService { ...@@ -203,7 +204,9 @@ public class ProcessService {
break; break;
case SYNC: case SYNC:
SyncParameters sync = new SyncParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer);
sync.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(sync, projectConfigInfoDto);
break; break;
case SUBPROCESS: case SUBPROCESS:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment