Commit 0adfe38d authored by sml's avatar sml

代码提交(文件生成根据execId)

parent fbb66a81
...@@ -31,11 +31,12 @@ public class ApiApplication implements HealthIndicator { ...@@ -31,11 +31,12 @@ public class ApiApplication implements HealthIndicator {
} }
String jobId = args[1]; String jobId = args[1];
boolean isSingle = Boolean.parseBoolean(args[2]); boolean isSingle = Boolean.parseBoolean(args[2]);
String execId = args[3];
logger.info("启动参数,taskId【{}】,jobId【{}】,isSingle【{}】",taskId,jobId,isSingle); logger.info("启动参数,taskId【{}】,jobId【{}】,isSingle【{}】, execId【{}】", taskId, jobId, isSingle, execId);
ProcessService processService = ApplicationContextUtil.getBean(ProcessService.class); ProcessService processService = ApplicationContextUtil.getBean(ProcessService.class);
processService.taskStart(taskId, jobId, isSingle); processService.taskStart(taskId, jobId, isSingle, execId);
long cost = System.currentTimeMillis() - start; long cost = System.currentTimeMillis() - start;
logger.info(" started status: {}, cost: {}", "SUCCESS!", cost); logger.info(" started status: {}, cost: {}", "SUCCESS!", cost);
......
...@@ -356,9 +356,12 @@ public class SqlParameters extends AbstractParameters { ...@@ -356,9 +356,12 @@ public class SqlParameters extends AbstractParameters {
JSONObject hdfsObj = scriptObj.getJSONObject("hdfs"); JSONObject hdfsObj = scriptObj.getJSONObject("hdfs");
String hdfsDir = hdfsObj.getString("hdfsDir"); String hdfsDir = hdfsObj.getString("hdfsDir");
hdfsDir = hdfsDir+"/"+System.currentTimeMillis();
Map<String, String> hdfsModel = new HashMap<String, String>(); Map<String, String> hdfsModel = new HashMap<String, String>();
hdfsModel.put("source_table_name", "t"); hdfsModel.put("source_table_name", "t");
hdfsModel.put("path", hdfsDir); hdfsModel.put("path", hdfsDir);
hdfsModel.put("save_mode", "overwrite");
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_HDFS, hdfsModel, freeMarkerConfig); sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_HDFS, hdfsModel, freeMarkerConfig);
} }
......
...@@ -230,6 +230,11 @@ public class TaskExecutionContext implements Serializable { ...@@ -230,6 +230,11 @@ public class TaskExecutionContext implements Serializable {
private AbstractParameters parameters; private AbstractParameters parameters;
/**
* azkaban 执行id
*/
private String execId;
public TaskExecutionContext() {} public TaskExecutionContext() {}
/** /**
...@@ -594,6 +599,14 @@ public class TaskExecutionContext implements Serializable { ...@@ -594,6 +599,14 @@ public class TaskExecutionContext implements Serializable {
this.parameters = parameters; this.parameters = parameters;
} }
public String getExecId() {
return execId;
}
public void setExecId(String execId) {
this.execId = execId;
}
@Override @Override
public String toString() { public String toString() {
return "TaskExecutionContext{" return "TaskExecutionContext{"
......
...@@ -60,9 +60,10 @@ public class WaterdropCommandExecutor extends AbstractCommandExecutor { ...@@ -60,9 +60,10 @@ public class WaterdropCommandExecutor extends AbstractCommandExecutor {
@Override @Override
protected String buildCommandFilePath() { protected String buildCommandFilePath() {
// command file // command file
return String.format("%s/%s.%s" return String.format("%s/%s_%s.%s"
, taskExecutionContext.getExecutePath() , taskExecutionContext.getExecutePath()
, taskExecutionContext.getTaskAppId() , taskExecutionContext.getTaskAppId()
, taskExecutionContext.getExecId()
, OSUtils.isWindows() ? "bat" : "command"); , OSUtils.isWindows() ? "bat" : "command");
} }
......
...@@ -187,8 +187,8 @@ public class SqlTask extends AbstractTask { ...@@ -187,8 +187,8 @@ public class SqlTask extends AbstractTask {
//String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(), //String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(),
// taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh"); // taskExecutionContext.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
String fileName = String.format("%s/%s.%s", taskExecutionContext.getExecutePath(), String fileName = String.format("%s/%s_application_%s.%s", taskExecutionContext.getExecutePath(),
"application", "conf"); taskExecutionContext.getTaskAppId(), taskExecutionContext.getExecId(), "conf");
Path path = new File(fileName).toPath(); Path path = new File(fileName).toPath();
......
...@@ -80,9 +80,9 @@ public class ProcessService { ...@@ -80,9 +80,9 @@ public class ProcessService {
* @Title: taskStart @Description: TODO(启动task) @param 参数 @return void * @Title: taskStart @Description: TODO(启动task) @param 参数 @return void
* 返回类型 @throws * 返回类型 @throws
*/ */
public void taskStart(Integer taskId, String jobId, boolean isSingle) { public void taskStart(Integer taskId, String jobId, boolean isSingle, String execId) {
try { try {
TaskExecutionContext taskExecutionContext = findTaskExecutionContextById(taskId, jobId, isSingle); TaskExecutionContext taskExecutionContext = findTaskExecutionContextById(taskId, jobId, isSingle, execId);
AbstractTask task = TaskManager.newTask(taskExecutionContext); AbstractTask task = TaskManager.newTask(taskExecutionContext);
// AbstractTask task = null; // AbstractTask task = null;
...@@ -118,7 +118,7 @@ public class ProcessService { ...@@ -118,7 +118,7 @@ public class ProcessService {
* isSingle @param @return 参数 @return TaskExecutionContext * isSingle @param @return 参数 @return TaskExecutionContext
* 返回类型 @throws * 返回类型 @throws
*/ */
public TaskExecutionContext findTaskExecutionContextById(Integer taskId, String jobId, boolean isSingle) public TaskExecutionContext findTaskExecutionContextById(Integer taskId, String jobId, boolean isSingle, String execId)
throws Exception { throws Exception {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
// 获取任务数据,并获取job json,并组装TaskInstance // 获取任务数据,并获取job json,并组装TaskInstance
...@@ -135,6 +135,8 @@ public class ProcessService { ...@@ -135,6 +135,8 @@ public class ProcessService {
} }
taskExecutionContext = dto2execcontext(dmpDevelopTaskDto, jobId); taskExecutionContext = dto2execcontext(dmpDevelopTaskDto, jobId);
//设置azkaban执行ID
taskExecutionContext.setExecId(execId);
return taskExecutionContext; return taskExecutionContext;
} }
......
...@@ -92,7 +92,7 @@ public class SQLCommandExecutorTest { ...@@ -92,7 +92,7 @@ public class SQLCommandExecutorTest {
//@Test //@Test
public void test() { public void test() {
try { try {
TaskExecutionContext taskExecutionContext = processService.findTaskExecutionContextById(460,null,true); TaskExecutionContext taskExecutionContext = processService.findTaskExecutionContextById(460,null,true, "2");
AbstractTask task = TaskManager.newTask(taskExecutionContext); AbstractTask task = TaskManager.newTask(taskExecutionContext);
logger.info("task info : {}", task); logger.info("task info : {}", task);
......
...@@ -74,7 +74,7 @@ public class ShellCommandExecutorTest { ...@@ -74,7 +74,7 @@ public class ShellCommandExecutorTest {
//@Test //@Test
public void test() { public void test() {
try { try {
TaskExecutionContext taskExecutionContext = processService.findTaskExecutionContextById(473,null,true); TaskExecutionContext taskExecutionContext = processService.findTaskExecutionContextById(473,null,true, "1");
AbstractTask task = TaskManager.newTask(taskExecutionContext); AbstractTask task = TaskManager.newTask(taskExecutionContext);
//AbstractTask task = null; //AbstractTask task = null;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment