Commit 32b23a0d authored by mcb's avatar mcb

COMMIT

parent 8e1fa3fc
......@@ -61,6 +61,7 @@ public class CommConstant {
public static final String WATERDROP_FTL_SINK_JDBC = "sink_jdbc.ftl";
public static final String WATERDROP_FTL_SINK_KAFKA = "sink_kafka.ftl";
public static final String WATERDROP_FTL_SINK_API = "sink_api.ftl";
public static final String WATERDROP_FTL_SINK_KUDU = "source_sink_kudu.ftl";
//其他script模板
public static final String FTL_SFTP_DOWNLOAD = "sftp_download.ftl";//ftp下载
......
......@@ -11,6 +11,7 @@ import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.common.utils.StringUtils;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
import org.springframework.ui.freemarker.FreeMarkerConfigurationFactoryBean;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer;
......@@ -132,10 +133,6 @@ public class SyncParameters extends AbstractParameters {
//source
getJdbcSource(dmpSyncingDatasource, registerTableName, freeMarkerConfig, publicKey, i, sourceObj);
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
//source
getSourceHive(envModel, freeMarkerConfig, registerTableName, i);
}
if (sourceTypeId == DatasouceTypeConstant.Kudu) {
//source
getSourceKudu(dmpSyncingDatasource, registerTableName, freeMarkerConfig, i);
......@@ -148,6 +145,10 @@ public class SyncParameters extends AbstractParameters {
//source
getsourceElasticsearch(registerTableName, dmpSyncingDatasource, freeMarkerConfig, i);
}
if (sourceTypeId == DatasouceTypeConstant.Hive) {
//source
getSourceHive(freeMarkerConfig, source_table_name,sourceObj);
}
}
//***** sink ******
//产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) sink
......@@ -183,7 +184,7 @@ public class SyncParameters extends AbstractParameters {
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", targetDatasource.getHost() + ":" + targetDatasource.getPort()); //主机名
kuduModel.put("result_table_name", targetTable); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, kuduModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KUDU, kuduModel, freeMarkerConfig);
}
private void getSourceKudu(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurationFactoryBean freeMarkerConfig, int i) {
......@@ -191,7 +192,7 @@ public class SyncParameters extends AbstractParameters {
Map kuduModel = new HashMap();
kuduModel.put("kuduMaster", dmpSyncingDatasource.getHost() + ":" + dmpSyncingDatasource.getPort()); //主机名
kuduModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, kuduModel, freeMarkerConfig);
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SINK_KUDU, kuduModel, freeMarkerConfig);
}
private void getSinkSftp(String targetTable, DmpSyncingDatasource targetDatasource, String publicKey, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig) {
......@@ -252,18 +253,27 @@ public class SyncParameters extends AbstractParameters {
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_ELASTICSEARCH, jdbcModel, freeMarkerConfig);
}
public void getSourceHive(Map<String, String> envModel, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String[] registerTableName, int i) {
public void getSourceHive(FreeMarkerConfigurationFactoryBean freeMarkerConfig, String registerTableName, Map<String, Object> sourceObj) {
//source
String tableName = registerTableName[i];
String tableName = registerTableName;
Map hiveModel = new HashMap();
hiveModel.put("pre_sql", " select * from " + tableName);
hiveModel.put("result_table_name", tableName);
String extractExpression = String.valueOf(sourceObj.get("extractExpression")); //数据过滤
if (StringUtils.isNotEmpty(extractExpression)) {
StringBuilder sql = new StringBuilder()
.append(" (select * ")
.append(" from ")
.append(tableName)
.append(" where ")
.append(extractExpression + " ) t");
hiveModel.put("pre_sql", sql);
}
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, hiveModel, freeMarkerConfig);
}
public void getJdbcSource(DmpSyncingDatasource dmpSyncingDatasource, String[] registerTableName, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, int i, Map<String, Object> sourceObj) {
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map jdbcModel = new HashMap();
jdbcModel.put("driver", dmpSyncingDatasource.getDriverClassName());
jdbcModel.put("url", dmpSyncingDatasource.getJdbcUrl());
......@@ -271,13 +281,21 @@ public class SyncParameters extends AbstractParameters {
jdbcModel.put("result_table_name", tableName);
jdbcModel.put("user", dmpSyncingDatasource.getUserName());
jdbcModel.put("password", EncryptionUtils.decode(dmpSyncingDatasource.getPassword(), publicKey));
//sourceObj.get("extractExpression"); //数据过滤
String extractExpression = String.valueOf(sourceObj.get("extractExpression")); //数据过滤
if (StringUtils.isNotEmpty(extractExpression)) {
StringBuilder sql = new StringBuilder()
.append(" (select * ")
.append(" from ")
.append(tableName)
.append(" where ")
.append(extractExpression + " ) t");
jdbcModel.put("table", sql);
}
//jdbcModel.put("partitionColumn", "");
//jdbcModel.put("numPartitions", "");
//jdbcModel.put("lowerBound", "");
//jdbcModel.put("upperBound", "");
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_JDBC, jdbcModel, freeMarkerConfig);
//}
}
public void getJdbcSink(DmpSyncingDatasource targetDatasource, Map<String, Object> targetObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, String publicKey, String source_table_name) {
......@@ -319,7 +337,6 @@ public class SyncParameters extends AbstractParameters {
public void getSourceSftp(String[] registerTableName, DmpSyncingDatasource dmpSyncingDatasource, String publicKey, Map<String, Object> sourceObj, FreeMarkerConfigurationFactoryBean freeMarkerConfig, int i) {
String tableName = registerTableName[i];
//for (String tableName : registerTableName) {
Map sftpModel = new HashMap();
sftpModel.put("host", dmpSyncingDatasource.getHost()); //主机名
sftpModel.put("user", dmpSyncingDatasource.getUserName()); //用户
......@@ -339,7 +356,6 @@ public class SyncParameters extends AbstractParameters {
}
sftpModel.put("result_table_name", tableName); //spark生成的临时表名
source = source + FreeMarkerUtils.freemakerNoneWebJson(CommConstant.WATERDROP_FTL_SOURCE_SFTP, sftpModel, freeMarkerConfig);
//}
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment