Commit 55c875df authored by mcb's avatar mcb

COMMIT

parent 86a790bf
......@@ -147,7 +147,7 @@ public class SyncParameters extends AbstractParameters {
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
//source
getSourceHive(freeMarkerConfig, source_table_name,sourceObj);
getSourceHive(freeMarkerConfig, source_table_name, sourceObj);
}
}
//***** sink ******
......@@ -161,10 +161,14 @@ public class SyncParameters extends AbstractParameters {
getJdbcSink(targetDatasource, targetObj, freeMarkerConfig, publicKey, source_table_name);
}
if (targetTypeId == DatasouceTypeConstant.SFTP) {
getSinkSftp(targetTable, targetDatasource, publicKey, targetObj, freeMarkerConfig);
getSinkSftp(targetDatasource, publicKey, targetObj, freeMarkerConfig, source_table_name);
}
if (targetTypeId == DatasouceTypeConstant.Kudu) {
getSinkKudu(targetDatasource, targetTable, freeMarkerConfig);
getSinkKudu(targetDatasource, freeMarkerConfig, source_table_name);
}
if (targetTypeId == DatasouceTypeConstant.Hive) {
//source
getSinkHive(freeMarkerConfig,sourceObj, source_table_name,envModel );
}
//transform
if (mappingObj.size() > 0 && null != mappingObj) {
......@@ -180,10 +184,31 @@ public class SyncParameters extends AbstractParameters {
waterdropScript = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL, waterdropModel, freeMarkerConfig);
}
private void getSinkKudu(DmpSyncingDatasource targetDatasource, String targetTable, FreeMarkerConfigurationFactoryBean freeMarkerConfig) {
private void getSinkHive(FreeMarkerConfigurationFactoryBean freeMarkerConfig, Map<String, Object> sourceObj, String source_table_name,Map<String, String> envModel ) {
Map hiveModel = new HashMap();
hiveModel.put("pre_sql", " select * from " + source_table_name);
hiveModel.put("result_table_name", source_table_name);
String extractExpression = String.valueOf(sourceObj.get("extractExpression")); //数据过滤
if (StringUtils.isNotEmpty(extractExpression)) {
StringBuilder sql = new StringBuilder()
.append(" (select * ")
.append(" from ")
.append(source_table_name)
.append(" where ")
.append(extractExpression + " ) t");
hiveModel.put("pre_sql", sql);
}
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, hiveModel, freeMarkerConfig);
//evn
envModel.put("sparkSqlCatalogImplementation", "hive");
env = FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_ENV, envModel, freeMarkerConfig);
}
private void getSinkKudu(DmpSyncingDatasource targetDatasource,FreeMarkerConfigurationFactoryBean freeMarkerConfig, String source_table_name) {
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", targetDatasource.getHost() + ":" + targetDatasource.getPort()); //主机名
kuduModel.put("result_table_name", targetTable); //spark生成的临时表名
kuduModel.put("result_table_name", source_table_name); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KUDU, kuduModel, freeMarkerConfig);
}
......@@ -195,7 +220,7 @@ public class SyncParameters extends AbstractParameters {
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KUDU, kuduModel, freeMarkerConfig);
}
private void getSinkSftp(String targetTable, DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig) {
private void getSinkSftp(DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig,String source_table_name) {
Map sftpModel = new HashMap();
sftpModel.put("host", targetDatasource.getHost()); //主机名
sftpModel.put("user", targetDatasource.getUserName()); //用户
......@@ -207,7 +232,7 @@ public class SyncParameters extends AbstractParameters {
if (null != targetObj.get("sourceFtpDir")) {
sftpModel.put("path", targetObj.get("sourceFtpDir")); //文件路径
}
sftpModel.put("result_table_name", targetTable); //spark生成的临时表名
sftpModel.put("result_table_name", source_table_name); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
}
......@@ -297,6 +322,7 @@ public class SyncParameters extends AbstractParameters {
//jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
}
//sink
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, String source_table_name) {
String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
......
......@@ -10,4 +10,7 @@
<#if sparkexecutormemory??>
spark.executor.memory = "${sparkexecutormemory!}"
</#if>
<#if sparkSqlCatalogImplementation??>
spark.sql.catalogImplementation = "${sparkSqlCatalogImplementation!}"
</#if>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment