Commit 4859e430 authored by mcb's avatar mcb

commit

parent 4e1937ae
......@@ -3,10 +3,12 @@ package com.jz.dmp.cmdexectool.scheduler.common.task.sync;
import com.alibaba.fastjson.JSONObject;
import com.jz.dmp.cmdexectool.common.constant.CommConstant;
import com.jz.dmp.cmdexectool.common.constant.DatasouceTypeConstant;
import com.jz.dmp.cmdexectool.common.utils.EncryptionUtils;
import com.jz.dmp.cmdexectool.common.utils.FreeMarkerUtils;
import com.jz.dmp.cmdexectool.controller.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.cmdexectool.entity.DmpSyncingDatasource;
import com.jz.dmp.cmdexectool.mapper.DmpSyncingDatasourceDao;
import com.jz.dmp.cmdexectool.scheduler.common.enums.MyDbType;
import com.jz.dmp.cmdexectool.scheduler.common.process.ResourceInfo;
import com.jz.dmp.cmdexectool.scheduler.common.task.AbstractParameters;
import com.jz.dmp.cmdexectool.scheduler.dao.datasource.MyBaseDataSource;
......@@ -83,7 +85,7 @@ public class SyncParameters extends AbstractParameters {
*/
private MyBaseDataSource targetBaseDataSource;
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig) throws Exception {
public SyncParameters(String script, DmpProjectConfigInfoDto projectConfigInfoDto, DmpSyncingDatasourceDao dmpSyncingDatasourceDao, FreeMarkerConfigurer freeMarkerConfig, String publicKey) throws Exception {
source = "";
env = "";
sink = "";
......@@ -113,12 +115,6 @@ public class SyncParameters extends AbstractParameters {
DmpSyncingDatasource targetDatasource = dmpSyncingDatasourceDao.queryById(targetDbId);
Integer targetTypeId = dmpSyncingDatasource.getDatasourceType();
String targetTable = String.valueOf(targetObj.get("targetTable"));//目标表
String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句
preStatements = new ArrayList<String>();
preStatements.add(preImportStatement);
posStatements = new ArrayList<String>();
posStatements.add(postImportStatement);
//产生 Jdbc(MySQL、Oracle、SqlServer、PostgreSQL、Informix、DB2) source
if ((sourceTypeId == DatasouceTypeConstant.MySQL
......@@ -137,7 +133,7 @@ public class SyncParameters extends AbstractParameters {
|| targetTypeId == DatasouceTypeConstant.PostgreSQL
|| targetTypeId == DatasouceTypeConstant.Informix
|| targetTypeId == DatasouceTypeConstant.DB2)) {
getJdbcSink(targetDatasource,targetObj,freeMarkerConfig);
getJdbcSink(targetDatasource,targetObj,freeMarkerConfig,publicKey);
}
//transform
if (mappingObj.size() > 0 && null != mappingObj) {
......@@ -170,18 +166,6 @@ public class SyncParameters extends AbstractParameters {
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_SQL, targetModel, freeMarkerConfig);
}
/*if (mappingObj.size() > 0 && null != mappingObj) {
for (Map<String, Object> item : mappingObj) {
Map<String, String> transformJson2Model = new HashMap<String, String>();
if (null != item.get("sourceField")) {
transformJson2Model.put("source_field", (String) item.get("sourceField"));
}
if (null != item.get("targetField")) {
transformJson2Model.put("target_field", (String) item.get("targetField"));
}
transform = transform + FreeMarkerUtils.freemakerJson(CommConstant.WATERDROP_FTL_TRANSFORM_JSON2, transformJson2Model, freeMarkerConfig);
}
}*/
if (sourceTypeId == DatasouceTypeConstant.Elasticsearch) {
//source
......@@ -250,7 +234,22 @@ public class SyncParameters extends AbstractParameters {
}
}
public void getJdbcSink(DmpSyncingDatasource targetDatasource,Map<String, Object> targetObj,FreeMarkerConfigurer freeMarkerConfig){
public void getJdbcSink(DmpSyncingDatasource targetDatasource,Map<String, Object> targetObj,FreeMarkerConfigurer freeMarkerConfig,String publicKey){
String postImportStatement = String.valueOf(targetObj.get("postImportStatement")); //导入后语句
String preImportStatement = String.valueOf(targetObj.get("preImportStatement")); //导入前语句
preStatements = new ArrayList<String>();
preStatements.add(preImportStatement);
posStatements = new ArrayList<String>();
posStatements.add(postImportStatement);
targetBaseDataSource = new MyBaseDataSource();
targetBaseDataSource.setJdbcUrlDirect(targetDatasource.getJdbcUrl());
targetBaseDataSource.setUser(targetDatasource.getUserName());
String password = EncryptionUtils.decode(targetDatasource.getPassword(), publicKey);
MyDbType myDbType = MyDbType.obtainByIdStr(targetDatasource.getId().toString());
targetBaseDataSource.setPassword(password);
targetBaseDataSource.setMyDbType(myDbType);
//sink
Map jdbcSinkModel = new HashMap();
jdbcSinkModel.put("driver", targetDatasource.getDriverClassName());
......
......@@ -216,7 +216,7 @@ public class ProcessService {
break;
case sync:
SyncParameters sync = new SyncParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer);
SyncParameters sync = new SyncParameters(script, projectConfigInfoDto, dmpSyncingDatasourceDao, freeMarkerConfigurer, publicKey);
sync.setTaskAppId(taskAppId);
taskExecutionContext = new TaskExecutionContext(sync, projectConfigInfoDto);
break;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment