Commit 8e1c0ef7 authored by sml's avatar sml

sql ftl 提交

parent 25ec3bd8
package com.jz.dmp.cmdexectool.common.constant; package com.jz.dmp.cmdexectool.common.constant;
import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties.Env;
/** /**
* @ClassName: CommConstant * @ClassName: CommConstant
* @Description: TODO(通用常亮) * @Description: TODO(通用常亮)
...@@ -43,5 +45,17 @@ public class CommConstant { ...@@ -43,5 +45,17 @@ public class CommConstant {
public static final String OUTPUT_TYPE_TABLE = "table";//table public static final String OUTPUT_TYPE_TABLE = "table";//table
public static final String OUTPUT_TYPE_TOPIC = "topic";//topic public static final String OUTPUT_TYPE_TOPIC = "topic";//topic
public static final String OUTPUT_TYPE_API = "api";//api public static final String OUTPUT_TYPE_API = "api";//api
/***************************************************/
//waterdrop 模板名称
public static final String WATERDROP_FTL = "waterdrop.ftl";
public static final String WATERDROP_FTL_ENV = "env.ftl";
public static final String WATERDROP_FTL_SOURCE_JDBC = "source_jdbc.ftl";
public static final String WATERDROP_FTL_TRANSFORM_SQL = "transform_sql.ftl";
public static final String WATERDROP_FTL_TRANSFORM_JSON2 = "transform_json2.ftl";
public static final String WATERDROP_FTL_SINK_CONSOLE = "sink_console.ftl";
public static final String WATERDROP_FTL_SINK_HDFS = "sink_hdfs.ftl";
public static final String WATERDROP_FTL_SINK_JDBC = "sink_jdbc.ftl";
public static final String WATERDROP_FTL_SINK_KAFKA = "sink_kafka.ftl";
} }
package com.jz.dmp.cmdexectool.common.utils;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import freemarker.core.ParseException;
import freemarker.template.MalformedTemplateNameException;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import freemarker.template.TemplateNotFoundException;
public class FreeMarkerUtils {
/**
* 使用freemaker模板生成 kafka connector 请求参数
*
* @param type 模板类型
* @param dataModel 模板里定义的变量数据对象
* @return
* @author Bellamy
*/
public static String freemakerJson(String ftlName, Map<String, String> dataModel, FreeMarkerConfigurer freeMarkerConfig) {
StringWriter stringWriter = new StringWriter();
try {
Template template = freeMarkerConfig.getConfiguration().getTemplate(ftlName);
if (template != null) {
try {
template.process(dataModel, stringWriter);
} catch (TemplateException e) {
e.printStackTrace();
}
}
} catch (TemplateNotFoundException e) {
e.printStackTrace();
} catch (MalformedTemplateNameException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return stringWriter.toString();
}
}
...@@ -16,8 +16,10 @@ ...@@ -16,8 +16,10 @@
*/ */
package com.jz.dmp.cmdexectool.scheduler.common.task.sql; package com.jz.dmp.cmdexectool.scheduler.common.task.sql;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant; import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto; import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource; import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao; import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
...@@ -29,6 +31,8 @@ import java.util.HashMap; ...@@ -29,6 +31,8 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/** /**
* Sql/Hql parameter * Sql/Hql parameter
*/ */
...@@ -43,29 +47,34 @@ public class SqlParameters extends AbstractParameters { ...@@ -43,29 +47,34 @@ public class SqlParameters extends AbstractParameters {
/** /**
* 数据源相关配置 * 数据源相关配置
*/ */
private Map<String, Object> source; private String source;
/** /**
* 目标相关配置 * 目标相关配置
*/ */
private Map<String, Object> sink; private String sink;
/** /**
* 环境相关配置 * 环境相关配置
*/ */
private Map<String, Object> env; private String env;
/** /**
* ETL相关配置 * ETL相关配置
*/ */
private Map<String, Object> transform; private String transform;
/**
* waterdropScript脚本
*/
private String waterdropScript;
/** /**
* resource list * resource list
*/ */
private List<ResourceInfo> resourceList; private List<ResourceInfo> resourceList;
public SqlParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao) { public SqlParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) {
this.script = script; this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script); JSONObject scriptObj = JSONObject.parseObject(script);
...@@ -73,48 +82,102 @@ public class SqlParameters extends AbstractParameters { ...@@ -73,48 +82,102 @@ public class SqlParameters extends AbstractParameters {
String sqlScript = scriptObj.getString("sqlScript"); String sqlScript = scriptObj.getString("sqlScript");
//evn //evn
env.put("spark.app.name", "Waterdrop"); Map<String, String> envModel = new HashMap<String, String>();
envModel.put("sparkappname", "Waterdrop");
env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
//source //source
Integer sourceId = scriptObj.getInteger("sourceId"); Integer sourceId = scriptObj.getInteger("sourceId");
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId); DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId);
Map<String, Object> jdbcMap = new HashMap<String, Object>(); String sourceTableNames = scriptObj.getString("sourceTableNames");
jdbcMap.put("driver", "com.mysql.jdbc.Driver"); String[] tableNameArr = sourceTableNames.split(",");
jdbcMap.put("url", dmpSyncingDatasource.getJdbcUrl()); for (String tableName : tableNameArr) {
//jdbcMap.put("table", value); Map<String, String> jdbcModel = new HashMap<String, String>();
//jdbcMap.put("result_table_name", value); jdbcModel.put("driver", "com.mysql.jdbc.Driver");
//jdbcMap.put("user", dmpSyncingDatasource.getUserName()); jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl());
//jdbcMap.put("password", dmpSyncingDatasource.getPassword()); jdbcModel.put("table", tableName);
//jdbcMap.put("", value); jdbcModel.put("result_table_name", tableName);
//jdbcMap.put("", value); jdbcModel.put("user", dmpSyncingDatasource.getUserName());
//jdbcMap.put("", value); jdbcModel.put("password", dmpSyncingDatasource.getPassword());
//jdbcMap.put("", value); source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
source.put("jdbc", jdbcMap); }
if (CommConstant.OUTPUT_TYPE_CONSOLE.equals(outputType)) { if (CommConstant.OUTPUT_TYPE_CONSOLE.equals(outputType)) {
//transform //transform
Map<String, Object> sqlMap = new HashMap<String, Object>(); Map<String, String> transformSqlModel = new HashMap<String, String>();
sqlMap.put("sql", sqlScript); transformSqlModel.put("sql", sqlScript);
transform.put("sql", sqlMap); transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink //sink
Map<String, Object> stdoutMap = new HashMap<String, Object>(); Map<String, String> stdoutModel = new HashMap<String, String>();
stdoutMap.put("limit", 10); sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_CONSOLE, stdoutModel, freeMarkerConfig);
stdoutMap.put("serializer", "json");
sink.put("stdout", stdoutMap);
}else if (CommConstant.OUTPUT_TYPE_HDFS.equals(outputType)) { }else if (CommConstant.OUTPUT_TYPE_HDFS.equals(outputType)) {
//transform //transform
Map<String, String> transformSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlScript);
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink //sink
JSONObject hdfsObj = scriptObj.getJSONObject("hdfs");
String hdfsDir = hdfsObj.getString("hdfsDir");
Map<String, String> hdfsModel = new HashMap<String, String>();
hdfsModel.put("path", hdfsDir);
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_HDFS, hdfsModel, freeMarkerConfig);
}else if (CommConstant.OUTPUT_TYPE_TABLE.equals(outputType)) { }else if (CommConstant.OUTPUT_TYPE_TABLE.equals(outputType)) {
//transform
Map<String, String> transformSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlScript);
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
JSONObject tableObj = scriptObj.getJSONObject("table");
JSONArray fieldsArr = tableObj.getJSONArray("tableFields");
for (int index = 0; index < fieldsArr.size(); index++) {
JSONObject fieldObj = fieldsArr.getJSONObject(index);
Map<String, String> transformJson2Model = new HashMap<String, String>();
transformJson2Model.put("source_field", fieldObj.getString("sourceFieldName"));
transformJson2Model.put("target_field", fieldObj.getString("targetFieldName"));
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_JSON2, transformJson2Model, freeMarkerConfig);
}
//sink
//targetSource
Integer targetSourceId = tableObj.getInteger("targetSourceId");
DmpSyncingDatasource targetSource = dmpSyncingDatasourceDao.queryById(targetSourceId);
Map<String, String> sinkJdbcModel = new HashMap<String, String>();
sinkJdbcModel.put("save_mode", "overwrite");
sinkJdbcModel.put("truncate", "true");
sinkJdbcModel.put("url", targetSource.getJdbcUrl());
sinkJdbcModel.put("driver", "com.mysql.jdbc.Driver");
sinkJdbcModel.put("user", targetSource.getUserName());
sinkJdbcModel.put("password", targetSource.getPassword());
sinkJdbcModel.put("dbtable", targetSource.getDbName());
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, sinkJdbcModel, freeMarkerConfig);
}else if (CommConstant.OUTPUT_TYPE_TOPIC.equals(outputType)) { }else if (CommConstant.OUTPUT_TYPE_TOPIC.equals(outputType)) {
//transform
Map<String, String> transformSqlModel = new HashMap<String, String>();
transformSqlModel.put("sql", sqlScript);
transform = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformSqlModel, freeMarkerConfig);
//sink
JSONObject topicObj = scriptObj.getJSONObject("topic");
Map<String, String> kafkaModel = new HashMap<String, String>();
kafkaModel.put("topic", topicObj.getString("topic"));
kafkaModel.put("broker", topicObj.getString("server"));
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_KAFKA, kafkaModel, freeMarkerConfig);
}else if (CommConstant.OUTPUT_TYPE_API.equals(outputType)) { }else if (CommConstant.OUTPUT_TYPE_API.equals(outputType)) {
} }
//waterdrop script
Map<String, String> waterdropModel = new HashMap<String, String>();
waterdropModel.put("env", env);
waterdropModel.put("source", source);
waterdropModel.put("transform", transform);
waterdropModel.put("sink", sink);
waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
} }
...@@ -134,38 +197,46 @@ public class SqlParameters extends AbstractParameters { ...@@ -134,38 +197,46 @@ public class SqlParameters extends AbstractParameters {
this.taskAppId = taskAppId; this.taskAppId = taskAppId;
} }
public Map<String, Object> getSource() { public String getSource() {
return source; return source;
} }
public void setSource(Map<String, Object> source) { public void setSource(String source) {
this.source = source; this.source = source;
} }
public Map<String, Object> getSink() { public String getSink() {
return sink; return sink;
} }
public void setSink(Map<String, Object> sink) { public void setSink(String sink) {
this.sink = sink; this.sink = sink;
} }
public Map<String, Object> getEnv() { public String getEnv() {
return env; return env;
} }
public void setEnv(Map<String, Object> env) { public void setEnv(String env) {
this.env = env; this.env = env;
} }
public Map<String, Object> getTransform() { public String getTransform() {
return transform; return transform;
} }
public void setTransform(Map<String, Object> transform) { public void setTransform(String transform) {
this.transform = transform; this.transform = transform;
} }
public String getWaterdropScript() {
return waterdropScript;
}
public void setWaterdropScript(String waterdropScript) {
this.waterdropScript = waterdropScript;
}
public List<ResourceInfo> getResourceList() { public List<ResourceInfo> getResourceList() {
return resourceList; return resourceList;
} }
...@@ -176,7 +247,7 @@ public class SqlParameters extends AbstractParameters { ...@@ -176,7 +247,7 @@ public class SqlParameters extends AbstractParameters {
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
return script != null && !script.isEmpty(); return waterdropScript != null && !waterdropScript.isEmpty();
} }
@Override @Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jz.dmp.cmdexectool.scheduler.server.worker.task;
import org.apache.commons.io.FileUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.function.Consumer;
/**
* shell command executor
*/
public class WaterdropCommandExecutor extends AbstractCommandExecutor {
/**
* For Unix-like, using sh
*/
public static final String SH = "sh";
/**
* For Windows, using cmd.exe
*/
public static final String CMD = "cmd.exe";
/**
* constructor
* @param logHandler logHandler
* @param taskExecutionContext taskExecutionContext
* @param logger logger
*/
public WaterdropCommandExecutor(Consumer<List<String>> logHandler,
TaskExecutionContext taskExecutionContext,
Logger logger) {
super(logHandler,taskExecutionContext,logger);
}
@Override
protected String buildCommandFilePath() {
// command file
return String.format("%s/%s.%s"
, taskExecutionContext.getExecutePath()
, taskExecutionContext.getTaskAppId()
, OSUtils.isWindows() ? "bat" : "command");
}
/**
* get command type
* @return command type
*/
@Override
protected String commandInterpreter() {
return OSUtils.isWindows() ? CMD : SH;
}
/**
* create command file if not exists
* @param execCommand exec command
* @param commandFile command file
* @throws IOException io exception
*/
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
logger.info("tenantCode user:{}, task dir:{}", taskExecutionContext.getTenantCode(),
taskExecutionContext.getTaskAppId());
// create if non existence
if (!Files.exists(Paths.get(commandFile))) {
logger.info("create command file:{}", commandFile);
StringBuilder sb = new StringBuilder();
if (OSUtils.isWindows()) {
//sb.append("@echo off\n");
//sb.append("cd /d %~dp0\n");
sb.append("./bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/application.conf");
if (taskExecutionContext.getEnvFile() != null) {
sb.append("call ").append(taskExecutionContext.getEnvFile()).append("\n");
}
} else {
//sb.append("#!/bin/sh\n");
//sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
//sb.append("cd $BASEDIR\n");
sb.append("./bin/start-waterdrop-spark.sh --master local[4] --deploy-mode client --config ./config/application.conf");
if (taskExecutionContext.getEnvFile() != null) {
sb.append("source ").append(taskExecutionContext.getEnvFile()).append("\n");
}
}
sb.append(execCommand);
logger.info("command : {}", sb.toString());
// write data to file
FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
}
}
}
...@@ -23,25 +23,17 @@ import java.nio.file.StandardOpenOption; ...@@ -23,25 +23,17 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.util.Map;
import java.util.Set; import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import com.jz.dmp.cmdexectool.scheduler.common.Constants; import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.CommandType;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters; import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.DateUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils; import com.jz.dmp.cmdexectool.scheduler.common.utils.OSUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext; import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.CommandExecuteResult;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.ShellCommandExecutor; import com.jz.dmp.cmdexectool.scheduler.server.worker.task.WaterdropCommandExecutor;
/** /**
* sql task * sql task
...@@ -54,9 +46,9 @@ public class SqlTask extends AbstractTask { ...@@ -54,9 +46,9 @@ public class SqlTask extends AbstractTask {
private SqlParameters sqlParameters; private SqlParameters sqlParameters;
/** /**
* shell command executor * waterdrop command executor
*/ */
private ShellCommandExecutor shellCommandExecutor; private WaterdropCommandExecutor waterdropCommandExecutor;
/** /**
* taskExecutionContext * taskExecutionContext
...@@ -73,7 +65,7 @@ public class SqlTask extends AbstractTask { ...@@ -73,7 +65,7 @@ public class SqlTask extends AbstractTask {
super(taskExecutionContext, logger); super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, logger); this.waterdropCommandExecutor = new WaterdropCommandExecutor(this::logHandle, taskExecutionContext, logger);
} }
@Override @Override
...@@ -83,7 +75,7 @@ public class SqlTask extends AbstractTask { ...@@ -83,7 +75,7 @@ public class SqlTask extends AbstractTask {
sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class); sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
if (!sqlParameters.checkParameters()) { if (!sqlParameters.checkParameters()) {
throw new RuntimeException("shell task params is not valid"); throw new RuntimeException("sql task params is not valid");
} }
} }
...@@ -91,7 +83,7 @@ public class SqlTask extends AbstractTask { ...@@ -91,7 +83,7 @@ public class SqlTask extends AbstractTask {
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // construct process
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand()); CommandExecuteResult commandExecuteResult = waterdropCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds()); setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId()); setProcessId(commandExecuteResult.getProcessId());
...@@ -105,7 +97,7 @@ public class SqlTask extends AbstractTask { ...@@ -105,7 +97,7 @@ public class SqlTask extends AbstractTask {
@Override @Override
public void cancelApplication(boolean cancelApplication) throws Exception { public void cancelApplication(boolean cancelApplication) throws Exception {
// cancel process // cancel process
shellCommandExecutor.cancelApplication(); waterdropCommandExecutor.cancelApplication();
} }
/** /**
...@@ -125,34 +117,6 @@ public class SqlTask extends AbstractTask { ...@@ -125,34 +117,6 @@ public class SqlTask extends AbstractTask {
return fileName; return fileName;
} }
String script = sqlParameters.getScript().replaceAll("\\r\\n", "\n");
/**
* combining local and global parameters
*/
Map<String, Property> paramsMap = ParamUtils.convert(
ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(), sqlParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), taskExecutionContext.getScheduleTime());
if (paramsMap != null) {
script = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
// new
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job
// and batch complement job
if (paramsMap != null) {
if (taskExecutionContext.getScheduleTime() != null) {
String dateTime = DateUtils.format(taskExecutionContext.getScheduleTime(),
Constants.PARAMETER_FORMAT_TIME);
Property p = new Property();
p.setValue(dateTime);
p.setProp(Constants.PARAMETER_SHECDULE_TIME);
paramsMap.put(Constants.PARAMETER_SHECDULE_TIME, p);
}
script = ParameterUtils.convertParameterPlaceholders2(script, ParamUtils.convert(paramsMap));
}
sqlParameters.setScript(script);
logger.info("raw script : {}", sqlParameters.getScript()); logger.info("raw script : {}", sqlParameters.getScript());
logger.info("task execute path : {}", taskExecutionContext.getExecutePath()); logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
...@@ -171,7 +135,7 @@ public class SqlTask extends AbstractTask { ...@@ -171,7 +135,7 @@ public class SqlTask extends AbstractTask {
Files.createFile(path, attr); Files.createFile(path, attr);
} }
Files.write(path, sqlParameters.getScript().getBytes(), StandardOpenOption.APPEND); Files.write(path, sqlParameters.getWaterdropScript().getBytes(), StandardOpenOption.APPEND);
return fileName; return fileName;
} }
......
...@@ -27,6 +27,7 @@ import org.springframework.aop.ThrowsAdvice; ...@@ -27,6 +27,7 @@ import org.springframework.aop.ThrowsAdvice;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
...@@ -66,6 +67,8 @@ public class ProcessService { ...@@ -66,6 +67,8 @@ public class ProcessService {
private DmpProjectConfigInfoMapper dmpProjectConfigInfoMapper; private DmpProjectConfigInfoMapper dmpProjectConfigInfoMapper;
@Autowired @Autowired
private DmpSyncingDatasourceDao dmpSyncingDatasourceDao; private DmpSyncingDatasourceDao dmpSyncingDatasourceDao;
@Autowired
private FreeMarkerConfigurer freeMarkerConfigurer;
/** /**
* @Title: taskStart @Description: TODO(启动task) @param 参数 @return void * @Title: taskStart @Description: TODO(启动task) @param 参数 @return void
...@@ -218,7 +221,7 @@ public class ProcessService { ...@@ -218,7 +221,7 @@ public class ProcessService {
break; break;
case SQL: case SQL:
SqlParameters sqlParameters = new SqlParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao); SqlParameters sqlParameters = new SqlParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer);
sqlParameters.setTaskAppId(taskAppId); sqlParameters.setTaskAppId(taskAppId);
taskExecutionContext = sqlParameters.getTaskExecutionContext(); taskExecutionContext = sqlParameters.getTaskExecutionContext();
......
<#if sparkappname??>
spark.app.name = "${sparkappname!}"
</#if>
<#if sparkexecutorinstances??>
spark.executor.instances = ${sparkexecutorinstances!}
</#if>
<#if sparkexecutorcores??>
spark.executor.cores = ${sparkexecutorcores!}
</#if>
<#if sparkexecutormemory??>
spark.executor.memory = "${sparkexecutormemory!}"
</#if>
\ No newline at end of file
stdout {
<#if limit??>
# 限制输出条数,范围[-1, 2147483647]
limit = ${limit!}
</#if>
<#if serializer??>
# 输出时序列化的格式,支持:json、plain
serializer = "${serializer!}"
</#if>
}
hdfs {
<#if path??>
# 输出文件路径,以 hdfs:// 开头
path = "${path!}"
</#if>
<#if serializer??>
# 序列化方法,当前支持csv、json、parquet、orc和text
serializer = "${serializer!}"
</#if>
<#if path_time_format??>
# 当index 有日期时,指定格式,默认是yyyy.MM.dd
# y:Year; M:Month; d:Day; H:Hour(0-23); m:Minute; s: Second
path_time_format = "${path_time_format!}"
</#if>
<#if partition_by??>
# 根据选择的字段进行分区
partition_by = "${partition_by!}"
</#if>
<#if save_mode??>
# 存储模式,支持overwrite、append、ignore、error,详细含义
save_mode = "${save_mode!}"
</#if>
}
Jdbc {
<#if save_mode??>
save_mode = "${save_mode!}"
</#if>
<#if truncate??>
truncate = ${truncate!}
</#if>
<#if url??>
url = "${url!}"
</#if>
<#if driver??>
driver = "${driver!}"
</#if>
<#if user??>
user= "${user!}"
</#if>
<#if password??>
password = "${password!}"
</#if>
<#if dbtable??>
dbtable = "${dbtable!}"
</#if>
}
Kafka {
<#if topic??>
# Kafka 主题
topic = "${topic!}"
</#if>
<#if broker??>
# Kafka 集群地址,可以配置多个,格式: "port1:9092,port2:9092,port3:9092"
broker = "${broker!}"
</#if>
}
\ No newline at end of file
jdbc {
<#if driver??>
# 驱动名
driver = "com.mysql.jdbc.Driver"
</#if>
<#if url??>
# JDBC的url
url = "jdbc:mysql://localhost:3306/info"
</#if>
<#if table??>
# 要加载的表名
table = "access"
</#if>
<#if result_table_name??>
# spark 临时表名称
result_table_name = "access_log"
</#if>
<#if user??>
# 连接数据库的名称
user = "username"
</#if>
<#if password??>
# 连接数据库的密码
password = "password"
</#if>
<#if partitionColumn??>
# 分区字段
jdbc.partitionColumn = "item_id"
</#if>
<#if numPartitions??>
# 分区个数
jdbc.numPartitions = "10"
</#if>
<#if lowerBound??>
# 分区上下限
jdbc.lowerBound = 0
</#if>
<#if upperBound??>
jdbc.upperBound = 100
</#if>
}
\ No newline at end of file
json {
<#if source_field??>
source_field = "${source_field!}"
</#if>
<#if target_field??>
target_field = "${target_field!}"
</#if>
}
\ No newline at end of file
sql {
<#if sql??>
# sql语句,sql中的表名可以是Source或者Transform插件配置的result_table_name
sql = "${sql!}",
</#if>
<#if table_name??>
# 表名,可以配置
table_name = "${table_name!}"
</#if>
}
\ No newline at end of file
env {
<#if env??>
${env!}
</#if>
}
source {
<#if source??>
${source!}
</#if>
}
transform {
<#if transform??>
${transform!}
</#if>
}
sink {
<#if sink??>
${sink!}
</#if>
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment