commit

parent 2a9c340d
......@@ -51,6 +51,8 @@ public class CommConstant {
public static final String WATERDROP_FTL = "waterdrop.ftl";
public static final String WATERDROP_FTL_ENV = "env.ftl";
public static final String WATERDROP_FTL_SOURCE_JDBC = "source_jdbc.ftl";
public static final String WATERDROP_FTL_SOURCE_ELASTICSEARCH = "source_elasticsearch.ftl";
public static final String WATERDROP_FTL_SOURCE_SFTP = "source_sftp.ftl";
public static final String WATERDROP_FTL_TRANSFORM_SQL = "transform_sql.ftl";
public static final String WATERDROP_FTL_TRANSFORM_JSON2 = "transform_json2.ftl";
public static final String WATERDROP_FTL_SINK_CONSOLE = "sink_console.ftl";
......
......@@ -10,7 +10,7 @@ package com.jz.dmp.cmdexectool.common.constant;
public class DatasouceTypeConstant {
public static final int MySQL = 1;
public static final int SQLServer = 2;
public static final int SqlServer = 2;
public static final int PostgreSQL = 3;
public static final int Oracle = 4;
public static final int DM = 5;
......@@ -19,8 +19,9 @@ public class DatasouceTypeConstant {
public static final int Impala = 8;
public static final int Kudu = 9;
public static final int HDFS = 10;
public static final int FTP = 11;
public static final int SFTP = 11;
public static final int Elasticsearch = 12;
public static final int Informix = 21;
......
......@@ -9,8 +9,10 @@ import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import org.apache.commons.lang3.StringUtils;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -72,26 +74,104 @@ public class SyncParameters extends AbstractParameters {
JSONObject scriptObj = JSONObject.parseObject(script);
Map<String, Object> scriptJson = (Map<String, Object>) scriptObj.get("params");
Map<String, Object> scripts = (Map<String, Object>) scriptJson.get("scripts");
Map<String, Object> source = (Map<String, Object>) scripts.get("reader");
Map<String, Object> sourceObj = (Map<String, Object>) scripts.get("reader"); //来源数据
Map<String, Object> targetObj = (Map<String, Object>) scripts.get("writer"); //目标数据
Map<String, Object> mappingObj = (Map<String, Object>) scripts.get("mappingRelation"); //字段映射关系
//evn
Map<String, String> envModel = new HashMap<String, String>();
envModel.put("sparkappname", "Waterdrop");
env = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
//Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2)
//source
Integer sourceId = Integer.valueOf((String) source.get("sourceDbId"));
//来源信息
Integer sourceId = Integer.valueOf((String) sourceObj.get("sourceDbId"));
DmpSyncingDatasource dmpSyncingDatasource = dmpSyncingDatasourceDao.queryById(sourceId);
Integer datasouceType = dmpSyncingDatasource.getDatasourceType();
switch (datasouceType) {
case DatasouceTypeConstant.MySQL:
Integer sourceTypeId = dmpSyncingDatasource.getDatasourceType();
String[] registerTableName = ((String) sourceObj.get("registerTableName")).split(",");
//目标信息
Integer targetDbId = Integer.valueOf((String) targetObj.get("targetDbId"));
DmpSyncingDatasource targetDatasource = dmpSyncingDatasourceDao.queryById(targetDbId);
Integer targetTypeId = dmpSyncingDatasource.getDatasourceType();
break;
case DatasouceTypeConstant.SQLServer:
//Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2)
//source
if (sourceTypeId == DatasouceTypeConstant.MySQL || sourceTypeId == DatasouceTypeConstant.Oracle || sourceTypeId == DatasouceTypeConstant.SqlServer || sourceTypeId == DatasouceTypeConstant.PostgreSQL
|| sourceTypeId == DatasouceTypeConstant.Informix || sourceTypeId == DatasouceTypeConstant.DB2) {
for (String tableName : registerTableName) {
Map jdbcModel = new HashMap();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl());
jdbcModel.put("table", tableName);
jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName());
jdbcModel.put("password", dmpSyncingDatasource.getPassword());
//jdbcModel.put("partitionColumn", "");
//jdbcModel.put("numPartitions", "");
//jdbcModel.put("lowerBound", dmpSyncingDatasource.getPassword());
//jdbcModel.put("upperBound", dmpSyncingDatasource.getPassword());
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
}
}
//sink
if (targetTypeId == DatasouceTypeConstant.MySQL || targetTypeId == DatasouceTypeConstant.Oracle || targetTypeId == DatasouceTypeConstant.SqlServer || targetTypeId == DatasouceTypeConstant.PostgreSQL
|| targetTypeId == DatasouceTypeConstant.Informix || targetTypeId == DatasouceTypeConstant.DB2) {
Map jdbcSinkModel = new HashMap();
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName());
jdbcSinkModel.put("url", targetDatasource.getJdbcUrl());
jdbcSinkModel.put("save_mode", targetObj.get("targetInsertMergeOverwrite")); //# 存储模式,支持overwrite、append、update、ignore、error
if (null != targetObj.get("targetInsertMergeOverwrite")) { //当存储模式是 overwrite时,仅清除表中数据
if ("overwrite".equals(targetObj.get("targetInsertMergeOverwrite"))) {
jdbcSinkModel.put("truncate", true);
} else {
jdbcSinkModel.put("truncate", false);
}
}
jdbcSinkModel.put("dbtable", targetObj.get("targetTable")); //目标表
jdbcSinkModel.put("user", targetDatasource.getUserName());
jdbcSinkModel.put("password", targetDatasource.getPassword());
sink = FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SINK_JDBC, jdbcSinkModel, freeMarkerConfig);
}
if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) {
//source
for (String tableName : registerTableName) {
List<String> list = new ArrayList<>();
String hosts = dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort();
list.add(hosts);
Map jdbcModel = new HashMap();
jdbcModel.put("hosts", list);
jdbcModel.put("result_table_name", tableName);
//jdbcModel.put("index", "");
//jdbcModel.put("name_age", "");
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfig);
}
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
default:
break;
}
if (sourceTypeId == DatasouceTypeConstant.SFTP) {
//source
for (String tableName : registerTableName) {
Map sftpModel = new HashMap();
sftpModel.put("host", dmpSyncingDatasource.getHost()); //主机名
sftpModel.put("user", dmpSyncingDatasource.getUserName()); //用户
sftpModel.put("password", dmpSyncingDatasource.getPassword());
sftpModel.put("port", dmpSyncingDatasource.getPort()); //端口
if (null != sourceObj.get("fileType")) {
sftpModel.put("fileType", sourceObj.get("fileType").toString()); //文件类型
}
sftpModel.put("delimiter", sourceObj.get("sourceCsvDelimiter")); //分隔符
sftpModel.put("head", true); //是否加载第一行
sftpModel.put("multiLine", true); //多行解析
if (null != sourceObj.get("sourceFtpDir")) {
sftpModel.put("path", sourceObj.get("sourceFtpDir").toString()); //文件路径
}
sftpModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
}
}
}
@Override
......
elasticsearch {
<#if hosts??>
# ElasticSearch 集群地址,格式为host:port,允许指定多个host。如 ["host1:9200", "host2:9200"]。
hosts = "${hosts!}"
</#if>
<#if index??>
# ElasticSearch index名称,支持 * 模糊匹配
index = "${index!}"
</#if>
<#if name_age??>
# 仅仅读取 name和 age 两个字段
es.read.field.include = "${name_age!}"
</#if>
<#if result_table_name??>
result_table_name = "${result_table_name!}"
</#if>
}
\ No newline at end of file
sftp {
<#if host??>
# SFTP 主机名
host = "${host!}"
</#if>
<#if user??>
# 用户
user = "${user!}"
</#if>
<#if password??>
# 密码
password = "${password!}"
</#if>
<#if fileType??>
# 分文件类型
fileType = "${fileType!}"
</#if>
<#if delimiter??>
# 分隔符
delimiter = "${delimiter!}"
</#if>
<#if head??>
# 是否加载第一行
head = ${head!}
</#if>
<#if multiLine??>
# 多行解析
multiLine = ${multiLine!}
</#if>
<#if path??>
# 文件路径
path = ${path!}
</#if>
<#if result_table_name??>
# spark 临时表名称
result_table_name = "${result_table_name!}"
</#if>
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment