Commit 8e1fa3fc authored by mcb's avatar mcb

冲突修改

parents 6237d0f6 c460e8db
......@@ -2,13 +2,18 @@ package com.jz.dmp.cmdexectool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.Banner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import com.jz.dmp.cmdexectool.common.utils.ApplicationContextUtil;
import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
......@@ -21,8 +26,8 @@ public class ApiApplication implements HealthIndicator {
public static void main(String[] args) {
long start = System.currentTimeMillis();
SpringApplication springApplication = new SpringApplication(ApiApplication.class);
ConfigurableApplicationContext context = springApplication.run(args);
ConfigurableApplicationContext context = new SpringApplicationBuilder(ApiApplication.class).web(WebApplicationType.NONE).bannerMode(Banner.Mode.OFF).run(args);
Integer taskId = Integer.parseInt(args[0]);
if (taskId==null) {
......@@ -41,8 +46,6 @@ public class ApiApplication implements HealthIndicator {
long cost = System.currentTimeMillis() - start;
logger.info(" started status: {}, cost: {}", "SUCCESS!", cost);
springApplication.exit(context);
}
@Override
......
......@@ -4,9 +4,11 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import freemarker.core.ParseException;
import freemarker.template.Configuration;
import freemarker.template.MalformedTemplateNameException;
import freemarker.template.Template;
import freemarker.template.TemplateException;
......@@ -22,7 +24,7 @@ public class FreeMarkerUtils {
* @return
* @author Bellamy
*/
public static String freemakerJson(String ftlName, Map<String, String> dataModel, FreeMarkerConfigurer freeMarkerConfig) {
public static String freemakerJson2(String ftlName, Map<String, String> dataModel, FreeMarkerConfigurer freeMarkerConfig) {
StringWriter stringWriter = new StringWriter();
try {
......@@ -46,4 +48,38 @@ public class FreeMarkerUtils {
return stringWriter.toString();
}
/**
* 使用freemaker模板生成 kafka connector 请求参数
*
* @param type 模板类型
* @param dataModel 模板里定义的变量数据对象
* @return
* @author Bellamy
*/
public static String freemakerNoneWebJson(String ftlName, Map<String, String> dataModel, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean) {
StringWriter stringWriter = new StringWriter();
try {
Configuration configuration = freeMarkerConfigurationFactoryBean.createConfiguration();
Template template = configuration.getTemplate(ftlName);
if (template != null) {
try {
template.process(dataModel, stringWriter);
} catch (TemplateException e) {
e.printStackTrace();
}
}
} catch (TemplateNotFoundException e) {
e.printStackTrace();
} catch (MalformedTemplateNameException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (TemplateException e) {
e.printStackTrace();
}
return stringWriter.toString();
}
}
package com.jz.dmp.cmdexectool.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
@Configuration
public class FreemarkerConfig {
@Bean(value = "freemarkerConfiguration")
public FreeMarkerConfigurationFactoryBean getFreeMarkerConfigurationFactoryBean(){
FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean = new FreeMarkerConfigurationFactoryBean();
freeMarkerConfigurationFactoryBean.setTemplateLoaderPath("classpath:templates");
return freeMarkerConfigurationFactoryBean;
}
}
......@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/**
......@@ -54,7 +55,7 @@ public class DoctransParameters extends AbstractParameters {
}
public DoctransParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
public DoctransParameters(String script, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
......@@ -74,7 +75,7 @@ public class DoctransParameters extends AbstractParameters {
doctransModel.put("source_convert", sourceConvert);
doctransModel.put("sink_convert", sinkConvert);
doctransModel.put("file_suffix", fileSuffix);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_DOCTRANS, doctransModel, freeMarkerConfig);
this.cmdScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.FTL_DOCTRANS, doctransModel, freeMarkerConfigurationFactoryBean);
}
......
......@@ -22,6 +22,7 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONObject;
......@@ -58,7 +59,7 @@ public class FtpParameters extends AbstractParameters {
}
public FtpParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
public FtpParameters(String script, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
......@@ -94,7 +95,7 @@ public class FtpParameters extends AbstractParameters {
logger.info("调用ftp command模板参数【{}】", JSONObject.toJSONString(ftpModel));
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_SFTP_DOWNLOAD, ftpModel, freeMarkerConfig);
this.cmdScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.FTL_SFTP_DOWNLOAD, ftpModel, freeMarkerConfigurationFactoryBean);
}
......
......@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/**
......@@ -54,7 +55,7 @@ public class HdfsParameters extends AbstractParameters {
}
public HdfsParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
public HdfsParameters(String script, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
......@@ -68,7 +69,7 @@ public class HdfsParameters extends AbstractParameters {
HdfsModel.put("src_dir", srcDir);
HdfsModel.put("file_suffix", fileSuffix);
HdfsModel.put("des_dir", desDir);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_HDFS_UPLOAD, HdfsModel, freeMarkerConfig);
this.cmdScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.FTL_HDFS_UPLOAD, HdfsModel, freeMarkerConfigurationFactoryBean);
}
......
......@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
......@@ -112,7 +113,7 @@ public class SqlParameters extends AbstractParameters {
*/
private List<ResourceInfo> resourceList;
public SqlParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig, String publicKey) {
public SqlParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, String publicKey) {
source = "";
env = "";
sink = "";
......@@ -131,28 +132,28 @@ public class SqlParameters extends AbstractParameters {
this.executioEngine = executioEngine;
//产生evn模板
generatorEnvStr(freeMarkerConfig);
generatorEnvStr(freeMarkerConfigurationFactoryBean);
//source
generatorSourceStr(dmpSyncingDatasourceDao, freeMarkerConfig, publicKey, scriptObj);
generatorSourceStr(dmpSyncingDatasourceDao, freeMarkerConfigurationFactoryBean, publicKey, scriptObj);
if (CommConstant.OUTPUT_TYPE_CONSOLE.equals(outputType)) {
//产生console transform and sink
geneConsoleTransfAndSink(freeMarkerConfig);
geneConsoleTransfAndSink(freeMarkerConfigurationFactoryBean);
}else if (CommConstant.OUTPUT_TYPE_HDFS.equals(outputType)) {
//产生hdfs transform and sink
geneHdfsTransfAndSink(freeMarkerConfig, scriptObj);
geneHdfsTransfAndSink(freeMarkerConfigurationFactoryBean, scriptObj);
}else if (CommConstant.OUTPUT_TYPE_TABLE.equals(outputType)) {
//产生table transform and sink
geneTableTransfAndSink(dmpSyncingDatasourceDao, freeMarkerConfig, scriptObj, publicKey);
geneTableTransfAndSink(dmpSyncingDatasourceDao, freeMarkerConfigurationFactoryBean, scriptObj, publicKey);
}else if (CommConstant.OUTPUT_TYPE_TOPIC.equals(outputType)) {
//产生topic transform and sink
geneTopicTransfAndSink(freeMarkerConfig, scriptObj);
geneTopicTransfAndSink(freeMarkerConfigurationFactoryBean, scriptObj);
}else if (CommConstant.OUTPUT_TYPE_API.equals(outputType)) {
//产生api transform and sink
geneApiTransfAndSink(freeMarkerConfig, scriptObj);
geneApiTransfAndSink(freeMarkerConfigurationFactoryBean, scriptObj);
}
//waterdrop script
geneWaterdropStr(freeMarkerConfig);
geneWaterdropStr(freeMarkerConfigurationFactoryBean);
}
......@@ -163,13 +164,13 @@ public class SqlParameters extends AbstractParameters {
* @return void 返回类型
* @throws
*/
private void geneWaterdropStr(FreeMarkerConfigurer freeMarkerConfig) {
private void geneWaterdropStr(FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean) {
Map<String, String> waterdropModel = new HashMap<String, String>();
waterdropModel.put("env", env);
waterdropModel.put("source", source);
waterdropModel.put("transform", transform);
waterdropModel.put("sink", sink);
this.waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
this.waterdropScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfigurationFactoryBean);
}
/**
......@@ -181,7 +182,7 @@ public class SqlParameters extends AbstractParameters {
* @return void 返回类型
* @throws
*/
private void geneApiTransfAndSink(FreeMarkerConfigurer freeMarkerConfig, JSONObject scriptObj) {
private void geneApiTransfAndSink(FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, JSONObject scriptObj) {
// 执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return;
......@@ -200,7 +201,7 @@ public class SqlParameters extends AbstractParameters {
Map<String, String> transformMappingSqlModel = new HashMap<String, String>();
transformMappingSqlModel.put("source_table_name", "t");
transformMappingSqlModel.put("sql", sqlStr);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformMappingSqlModel, freeMarkerConfig);
transform = transform + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformMappingSqlModel, freeMarkerConfigurationFactoryBean);
//sink
Map<String, String> sinkApiModel = new HashMap<String, String>();
......@@ -211,7 +212,7 @@ public class SqlParameters extends AbstractParameters {
sinkApiModel.put("signType", apiObj.getString("signType"));
sinkApiModel.put("authCode", apiObj.getString("authCode"));
sinkApiModel.put("salt", apiObj.getString("salt"));
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_API, sinkApiModel, freeMarkerConfig);
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_API, sinkApiModel, freeMarkerConfigurationFactoryBean);
}
/**
......@@ -223,7 +224,7 @@ public class SqlParameters extends AbstractParameters {
* @return void 返回类型
* @throws
*/
private void geneTopicTransfAndSink(FreeMarkerConfigurer freeMarkerConfig, JSONObject scriptObj) {
private void geneTopicTransfAndSink(FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, JSONObject scriptObj) {
// 执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return;
......@@ -242,7 +243,7 @@ public class SqlParameters extends AbstractParameters {
kafkaModel.put("source_table_name", "t");
kafkaModel.put("topic", topicObj.getString("topic"));
kafkaModel.put("broker", topicObj.getString("server"));
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_KAFKA, kafkaModel, freeMarkerConfig);
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KAFKA, kafkaModel, freeMarkerConfigurationFactoryBean);
}
/**
......@@ -256,7 +257,7 @@ public class SqlParameters extends AbstractParameters {
* @throws
*/
private void geneTableTransfAndSink(DmpSyncingDatasourceDao dmpSyncingDatasourceDao,
FreeMarkerConfigurer freeMarkerConfig, JSONObject scriptObj, String publicKey) {
FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, JSONObject scriptObj, String publicKey) {
JSONObject tableObj = scriptObj.getJSONObject("table");
......@@ -306,7 +307,7 @@ public class SqlParameters extends AbstractParameters {
Map<String, String> transformMappingSqlModel = new HashMap<String, String>();
transformMappingSqlModel.put("source_table_name", "t");
transformMappingSqlModel.put("sql", sqlStr);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformMappingSqlModel, freeMarkerConfig);
transform = transform + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, transformMappingSqlModel, freeMarkerConfigurationFactoryBean);
//sink
......@@ -329,7 +330,7 @@ public class SqlParameters extends AbstractParameters {
sinkJdbcModel.put("user", user);
sinkJdbcModel.put("password", password);
sinkJdbcModel.put("dbtable", targetTableName);
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, sinkJdbcModel, freeMarkerConfig);
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_JDBC, sinkJdbcModel, freeMarkerConfigurationFactoryBean);
}
}
......@@ -342,7 +343,7 @@ public class SqlParameters extends AbstractParameters {
* @return void 返回类型
* @throws
*/
private void geneHdfsTransfAndSink(FreeMarkerConfigurer freeMarkerConfig, JSONObject scriptObj) {
private void geneHdfsTransfAndSink(FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, JSONObject scriptObj) {
// 执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return ;
......@@ -363,7 +364,7 @@ public class SqlParameters extends AbstractParameters {
hdfsModel.put("source_table_name", "t");
hdfsModel.put("path", hdfsDir);
hdfsModel.put("save_mode", "overwrite");
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_HDFS, hdfsModel, freeMarkerConfig);
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_HDFS, hdfsModel, freeMarkerConfigurationFactoryBean);
}
/**
......@@ -374,7 +375,7 @@ public class SqlParameters extends AbstractParameters {
* @return void 返回类型
* @throws
*/
private void geneConsoleTransfAndSink(FreeMarkerConfigurer freeMarkerConfig) {
private void geneConsoleTransfAndSink(FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean) {
//执行引擎是jdbc,不用生成waterdrop
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return ;
......@@ -387,7 +388,7 @@ public class SqlParameters extends AbstractParameters {
//sink
Map<String, String> stdoutModel = new HashMap<String, String>();
stdoutModel.put("source_table_name", "t");
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_CONSOLE, stdoutModel, freeMarkerConfig);
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_CONSOLE, stdoutModel, freeMarkerConfigurationFactoryBean);
}
/**
......@@ -401,7 +402,7 @@ public class SqlParameters extends AbstractParameters {
* @throws
*/
private void generatorSourceStr(DmpSyncingDatasourceDao dmpSyncingDatasourceDao,
FreeMarkerConfigurer freeMarkerConfig, String publicKey, JSONObject scriptObj) {
FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean, String publicKey, JSONObject scriptObj) {
Integer sourceId = scriptObj.getInteger("sourceId");
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId);
......@@ -437,15 +438,15 @@ public class SqlParameters extends AbstractParameters {
jdbcModel.put("result_table_name", "t");
jdbcModel.put("user", user);
jdbcModel.put("password", password);
this.source = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel,
freeMarkerConfig);
this.source = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel,
freeMarkerConfigurationFactoryBean);
}else if (this.sourceBaseDataSource.getMyDbType() == MyDbType.Hive) {
Map<String, String> hiveModel = new HashMap<String, String>();
hiveModel.put("catalogImplementation", "hive");
hiveModel.put("pre_sql", "select * from hive_db.hive_table");
hiveModel.put("result_table_name", "table_view");
this.source = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_HIVE, hiveModel,
freeMarkerConfig);
this.source = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_HIVE, hiveModel,
freeMarkerConfigurationFactoryBean);
}else {
logger.info("waterdrow,不支持的数据源类型");
throw new RuntimeException("waterdrow,不支持的数据源");
......@@ -460,7 +461,7 @@ public class SqlParameters extends AbstractParameters {
* @return void 返回类型
* @throws
*/
private void generatorEnvStr(FreeMarkerConfigurer freeMarkerConfig) {
private void generatorEnvStr(FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean) {
//如果执行引擎选择的事jdbc,不用生成env
if (this.executioEngine.equals(CommConstant.EXECUTION_ENGINE_JDBC)) {
return ;
......@@ -468,7 +469,7 @@ public class SqlParameters extends AbstractParameters {
Map<String, String> envModel = new HashMap<String, String>();
envModel.put("sparkappname", "Waterdrop");
this.env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
this.env = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfigurationFactoryBean);
}
public String getScript() {
......
......@@ -12,6 +12,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import java.util.ArrayList;
......@@ -85,7 +86,7 @@ public class SyncParameters extends AbstractParameters {
*/
private MyBaseDataSource targetBaseDataSource;
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig, String publicKey) throws Exception {
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey) throws Exception {
source = "";
env = "";
sink = "";
......@@ -99,7 +100,7 @@ public class SyncParameters extends AbstractParameters {
//evn
Map<String, String> envModel = new HashMap<String, String>();
envModel.put("sparkappname", "Waterdrop");
env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
env = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
//target information
Integer targetDbId = Integer.valueOf((String) targetObj.get("targetDbId"));
......@@ -175,25 +176,25 @@ public class SyncParameters extends AbstractParameters {
waterdropModel.put("source", source);
waterdropModel.put("transform", transform);
waterdropModel.put("sink", sink);
waterdropScript = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
waterdropScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
}
private void getSinkKudu(DmpSyncingDatasource targetDatasource, String targetTable, FreeMarkerConfigurer freeMarkerConfig) {
private void getSinkKudu(DmpSyncingDatasource targetDatasource, String targetTable, FreeMarkerConfigurationFactoryBean freeMarkerConfig) {
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", targetDatasource.getHost() + ":" + targetDatasource.getPort()); //主机名
kuduModel.put("result_table_name", targetTable); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, kuduModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, kuduModel, freeMarkerConfig);
}
private void getSourceKudu(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurer freeMarkerConfig, int i) {
private void getSourceKudu(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurationFactoryBean freeMarkerConfig, int i) {
String tableName = registerTableName[i];
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort()); //主机名
kuduModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, kuduModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, kuduModel, freeMarkerConfig);
}
private void getSinkSftp(String targetTable, DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurer freeMarkerConfig) {
private void getSinkSftp(String targetTable, DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig) {
Map sftpModel = new HashMap();
sftpModel.put("host", targetDatasource.getHost()); //主机名
sftpModel.put("user", targetDatasource.getUserName()); //用户
......@@ -206,10 +207,10 @@ public class SyncParameters extends AbstractParameters {
sftpModel.put("path", targetObj.get("sourceFtpDir")); //文件路径
}
sftpModel.put("result_table_name", targetTable); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
}
private void getSyncTransform(List<Map<String, Object>> mappingObj, FreeMarkerConfigurer freeMarkerConfig, String source_table_name) {
private void getSyncTransform(List<Map<String, Object>> mappingObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String source_table_name) {
String sourceField = "";
String targetField = "";
for (Map<String, Object> item : mappingObj) {
......@@ -224,7 +225,7 @@ public class SyncParameters extends AbstractParameters {
.append(" $t ");
sqlModel.put("sql", sql);
sqlModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfig);
transform = transform + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, sqlModel, freeMarkerConfig);
Map targetModel = new HashMap();
StringBuilder targetSql = new StringBuilder()
......@@ -234,10 +235,10 @@ public class SyncParameters extends AbstractParameters {
.append(" $t ");
targetModel.put("sql", targetSql);
targetModel.put("source_table_name", source_table_name);
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
transform = transform + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
}
private void getsourceElasticsearch(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, FreeMarkerConfigurer freeMarkerConfig, int i) {
private void getsourceElasticsearch(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, FreeMarkerConfigurationFactoryBean freeMarkerConfig, int i) {
//source
String tableName = registerTableName[i];
List<String> list = new ArrayList<>();
......@@ -248,19 +249,19 @@ public class SyncParameters extends AbstractParameters {
jdbcModel.put("result_table_name", tableName);
//jdbcModel.put("index", "");
//jdbcModel.put("name_age", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfig);
}
public void getSourceHive(Map<String, String> envModel, FreeMarkerConfigurer freeMarkerConfig, String[] registerTableName, int i) {
public void getSourceHive(Map<String, String> envModel, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String[] registerTableName, int i) {
//source
String tableName = registerTableName[i];
Map hiveModel = new HashMap();
hiveModel.put("pre_sql", " select * from " + tableName);
hiveModel.put("result_table_name", tableName);
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, hiveModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, hiveModel, freeMarkerConfig);
}
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurer freeMarkerConfig, String publicKey, int i, Map<String, Object> sourceObj) {
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, int i, Map<String, Object> sourceObj) {
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map jdbcModel = new HashMap();
......@@ -275,11 +276,11 @@ public class SyncParameters extends AbstractParameters {
//jdbcModel.put("numPartitions", "");
//jdbcModel.put("lowerBound", "");
//jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
//}
}
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurer freeMarkerConfig, String publicKey, String source_table_name) {
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, String source_table_name) {
String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句
preStatements = new ArrayList<String>();
......@@ -313,10 +314,10 @@ public class SyncParameters extends AbstractParameters {
jdbcSinkModel.put("user", targetDatasource.getUserName());
jdbcSinkModel.put("password", password);
jdbcSinkModel.put("source_table_name", source_table_name);
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig);
sink = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig);
}
public void getSourceSftp(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, String publicKey, Map<String, Object> sourceObj, FreeMarkerConfigurer freeMarkerConfig, int i) {
public void getSourceSftp(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, String publicKey, Map<String, Object> sourceObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, int i) {
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map sftpModel = new HashMap();
......@@ -337,7 +338,7 @@ public class SyncParameters extends AbstractParameters {
sftpModel.put("path", sourceObj.get("sourceFtpDir").toString()); //文件路径
}
sftpModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
//}
}
......
......@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
/**
......@@ -54,7 +55,7 @@ public class UnzipfileParameters extends AbstractParameters {
}
public UnzipfileParameters(String script, FreeMarkerConfigurer freeMarkerConfig) {
public UnzipfileParameters(String script, FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean) {
this.script = script;
JSONObject scriptObj = JSONObject.parseObject(script);
......@@ -69,7 +70,7 @@ public class UnzipfileParameters extends AbstractParameters {
unzipfileModel.put("src_dir", srcDir);
unzipfileModel.put("des_dir", desDir);
unzipfileModel.put("type", type);
this.cmdScript = FreeMarkerUtils.freemakerJson(CommConstant.FTL_UNZIPFILE, unzipfileModel, freeMarkerConfig);
this.cmdScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.FTL_UNZIPFILE, unzipfileModel, freeMarkerConfigurationFactoryBean);
}
......
......@@ -21,13 +21,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.util.CollectionUtils;
//import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import com.alibaba.fastjson.JSONArray;
......@@ -49,6 +50,7 @@ import com.jz.dmp.cmdexectool.scheduler.common.task.ftp.FtpParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.hdfs.HdfsParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.shell.ShellParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.sync.SyncParameters;
import com.jz.dmp.cmdexectool.scheduler.common.task.unzipfile.UnzipfileParameters;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
......@@ -73,8 +75,10 @@ public class ProcessService {
private DmpProjectConfigInfoMapper dmpProjectConfigInfoMapper;
@Autowired
private DmpSyncingDatasourceDao dmpSyncingDatasourceDao;
//@Autowired
//private FreeMarkerConfigurer freeMarkerConfigurer;
@Autowired
private FreeMarkerConfigurer freeMarkerConfigurer;
private FreeMarkerConfigurationFactoryBean freeMarkerConfigurationFactoryBean;
/**
* @Title: taskStart @Description: TODO(启动task) @param 参数 @return void
......@@ -212,37 +216,37 @@ public class ProcessService {
break;
case sql:
SqlParameters sqlParameters = new SqlParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer, publicKey);
SqlParameters sqlParameters = new SqlParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurationFactoryBean, publicKey);
sqlParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(sqlParameters, projectConfigInfoDto);
break;
case sync:
SyncParameters sync = new SyncParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer, publicKey);
SyncParameters sync = new SyncParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurationFactoryBean, publicKey);
sync.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(sync, projectConfigInfoDto);
break;
case subprocess:
break;
case ftp:
FtpParameters ftpParameters = new FtpParameters(script, freeMarkerConfigurer);
FtpParameters ftpParameters = new FtpParameters(script, freeMarkerConfigurationFactoryBean);
ftpParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(ftpParameters, projectConfigInfoDto);
break;
case unzipFile:
UnzipfileParameters unzipfileParameters = new UnzipfileParameters(script, freeMarkerConfigurer);
UnzipfileParameters unzipfileParameters = new UnzipfileParameters(script, freeMarkerConfigurationFactoryBean);
unzipfileParameters.setScript(script);
unzipfileParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(unzipfileParameters, projectConfigInfoDto);
break;
case docTrans:
DoctransParameters doctransParameters = new DoctransParameters(script, freeMarkerConfigurer);
DoctransParameters doctransParameters = new DoctransParameters(script, freeMarkerConfigurationFactoryBean);
doctransParameters.setScript(script);
doctransParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(doctransParameters, projectConfigInfoDto);
break;
case hdfs:
HdfsParameters hdfsParameters = new HdfsParameters(script, freeMarkerConfigurer);
HdfsParameters hdfsParameters = new HdfsParameters(script, freeMarkerConfigurationFactoryBean);
hdfsParameters.setScript(script);
hdfsParameters.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(hdfsParameters, projectConfigInfoDto);
......
......@@ -16,35 +16,12 @@
*/
package com.jz.cmdexectool.test.task.shell;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.jz.dmp.cmdexectool.ApiApplication;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlBinds;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CollectionUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CommonUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.BaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.DataSourceFactory;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MySQLDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.utils.DatabaseUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.TaskManager;
import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.HIVE_CONF;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.PASSWORD;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.SEMICOLON;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.USER;
import static com.jz.dmp.cmdexectool.scheduler.common.enums.DbType.HIVE;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
......@@ -72,8 +49,28 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static com.jz.dmp.cmdexectool.scheduler.common.Constants.*;
import static com.jz.dmp.cmdexectool.scheduler.common.enums.DbType.HIVE;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.jz.dmp.cmdexectool.ApiApplication;
import com.jz.dmp.cmdexectool.scheduler.common.Constants;
import com.jz.dmp.cmdexectool.scheduler.common.enums.DbType;
import com.jz.dmp.cmdexectool.scheduler.common.enums.ExecutionStatus;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.Property;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlBinds;
import com.jz.dmp.cmdexectool.scheduler.common.task.sql.SqlType;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CollectionUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.CommonUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.JSONUtils;
import com.jz.dmp.cmdexectool.scheduler.common.utils.ParameterUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.DataSourceFactory;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import com.jz.dmp.cmdexectool.scheduler.dao.utils.DatabaseUtils;
import com.jz.dmp.cmdexectool.scheduler.server.entity.TaskExecutionContext;
import com.jz.dmp.cmdexectool.scheduler.server.utils.ParamUtils;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.AbstractTask;
import com.jz.dmp.cmdexectool.scheduler.server.worker.task.TaskManager;
import com.jz.dmp.cmdexectool.scheduler.service.process.ProcessService;
/**
* python shell command executor test
......@@ -89,6 +86,11 @@ public class SQLCommandExecutorTest {
private static final int LIMIT = 10000;
@Test
public void test22() {
System.out.println("test");
}
//@Test
public void test() {
try {
......@@ -121,11 +123,6 @@ public class SQLCommandExecutorTest {
}
//@Test
public void test22() {
System.out.println("test");
}
@Test
public void testJdbcHandler() {
// set the name of the current thread
//String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, "test");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment