Commit ad3a98af authored by mcb's avatar mcb

no message

parent 8e3f9c35
package com.jz.common.utils.web; package com.jz.common.utils.web;
import org.apache.commons.lang3.StringUtils;
import org.springframework.security.core.Authentication; import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.RequestContextHolder;
...@@ -106,6 +107,20 @@ public class SessionUtils { ...@@ -106,6 +107,20 @@ public class SessionUtils {
return (DmpMember)principal; return (DmpMember)principal;
} }
/**
* @Title: getSessionUserId
* @Description: 获取登录用户ID
* @return String 返回类型
*/
public static String getCurrentUserId(){
String userId = "";
DmpMember user = getSecurityUser();
if(user != null){
userId = String.valueOf(user.getUserId());
}
return userId;
}
/** /**
* @Title: getAuthentication * @Title: getAuthentication
* @Description: TODO(获取authentication) * @Description: TODO(获取authentication)
......
...@@ -228,8 +228,6 @@ public class RealTimeSyncController { ...@@ -228,8 +228,6 @@ public class RealTimeSyncController {
if (StringUtils.isEmpty(params.get("targetDataSourceId").toString())) { if (StringUtils.isEmpty(params.get("targetDataSourceId").toString())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "目标数据源id不能为空!"); return new JsonResult(ResultCode.PARAMS_ERROR, "目标数据源id不能为空!");
} }
boolean flag = false;
//异步提交 //异步提交
Thread thread = new Thread(new Runnable() { Thread thread = new Thread(new Runnable() {
@Override @Override
...@@ -242,7 +240,6 @@ public class RealTimeSyncController { ...@@ -242,7 +240,6 @@ public class RealTimeSyncController {
} }
}); });
thread.start(); thread.start();
return new JsonResult(); return new JsonResult();
} }
......
...@@ -9,6 +9,7 @@ import com.jz.common.page.PageInfoResponse; ...@@ -9,6 +9,7 @@ import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService; import com.jz.common.persistence.BaseService;
import com.jz.common.utils.realTime.DBUtil; import com.jz.common.utils.realTime.DBUtil;
import com.jz.common.utils.realTime.RestClient; import com.jz.common.utils.realTime.RestClient;
import com.jz.common.utils.web.SessionUtils;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto; import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto; import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq; import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
...@@ -283,20 +284,20 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -283,20 +284,20 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
@Override @Override
public boolean addRealTimeTask(Map<String, Object> params) throws Exception { public boolean addRealTimeTask(Map<String, Object> params) throws Exception {
String srcDataSourceId = params.get("srcDataSourceId").toString(); // 来源id String srcDataSourceId = params.get("srcDataSourceId").toString(); // 来源id
String targetDataSourceId = (String) params.get("targetDataSourceId"); // 目标源id String targetDataSourceId = params.get("targetDataSourceId").toString(); // 目标源id
Long projectId = Long.valueOf(params.get("projectId").toString()); //项目id Long projectId = Long.valueOf(params.get("projectId").toString()); //项目id
String connectorUrl = (String) params.get("connectorUrl"); //connectorUrl连接信息 String connectorUrl = (String) params.get("connectorUrl"); //connectorUrl连接信息
//根据来源数据源id获取数据信息 //根据来源数据源id获取数据信息
RealTimeSyncDataSourceModel sourceDbInfo = dmpRealtimeSyncInfoDao.querygSourceDbInfoById(srcDataSourceId); RealTimeSyncDataSourceModel sourceDbInfo = dmpRealtimeSyncInfoDao.querygSourceDbInfoById(srcDataSourceId);
if (sourceDbInfo != null) { if (sourceDbInfo == null) {
throw new RuntimeException("来源数据设置信息不存在!"); throw new RuntimeException("来源数据设置信息不存在!");
} }
sourceDbInfo.setPassword(new BaseService().decode(sourceDbInfo.getPassword(), publicKey)); sourceDbInfo.setPassword(new BaseService().decode(sourceDbInfo.getPassword(), publicKey));
//根据目标数据源id获取数据信息 //根据目标数据源id获取数据信息
RealTimeSyncDataSourceModel targetDbInfo = dmpRealtimeSyncInfoDao.querygSourceDbInfoById(targetDataSourceId); RealTimeSyncDataSourceModel targetDbInfo = dmpRealtimeSyncInfoDao.querygSourceDbInfoById(targetDataSourceId);
if (targetDbInfo != null) { if (targetDbInfo == null) {
throw new RuntimeException("目标数据源设置信息不存在!"); throw new RuntimeException("目标数据源设置信息不存在!");
} }
targetDbInfo.setPassword(new BaseService().decode(targetDbInfo.getPassword(), publicKey)); targetDbInfo.setPassword(new BaseService().decode(targetDbInfo.getPassword(), publicKey));
...@@ -326,12 +327,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -326,12 +327,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
} }
//提交源到源的connector //提交源到源的connector
Long realtimeId = submitDatasource2DatasourceToConnector(projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params); Long realtimeId = submitDatasource2DatasourceToConnector(projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
if (realtimeId != null) {
//处理已选择的表信息 //处理已选择的表信息
submitNoSelectTable(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params); submitNoSelectTable(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
}
return false; return false;
} }
private Long submitDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) { private Long submitDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception {
Integer srcDataSourceId = sourceDbInfo.getId(); Integer srcDataSourceId = sourceDbInfo.getId();
Integer targetDataSourceId = targetDbInfo.getId(); Integer targetDataSourceId = targetDbInfo.getId();
...@@ -379,7 +382,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -379,7 +382,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
} }
} }
} }
logger.info("################### 解析黑名单表--结束" + connectorBlacklistTables + " ################"); logger.info("################### 解析黑名单表--结束{}" + connectorBlacklistTables + " ################");
//解析已选择表 //解析已选择表
String connectorWhitelistTables = ""; String connectorWhitelistTables = "";
...@@ -400,7 +403,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -400,7 +403,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
} }
} }
} }
logger.info("################### 解析已选择表--结束 ################"); logger.info("################### 解析已选择表--结束{} " + connectorWhitelistTables + " ################");
Map<String, String> dataModelMap = new HashMap<>(); Map<String, String> dataModelMap = new HashMap<>();
...@@ -435,10 +438,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -435,10 +438,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
queryParam.put("sourceConnectorName", sourceConnectorName); queryParam.put("sourceConnectorName", sourceConnectorName);
Map dataMap = dmpRealtimeSyncInfoDao.selecltRealtimeSyncInfoByParams(queryParam); Map dataMap = dmpRealtimeSyncInfoDao.selecltRealtimeSyncInfoByParams(queryParam);
//不存在同一个数据源的同步任务 没查到信息 //不存在同一个数据源的同步任务 没查到信息
if (dataMap.size() == 0 && dataMap.isEmpty()) { if (dataMap == null) {
logger.info("######不存在同一个数据源的同步任务srcDataSourceId=" + srcDataSourceId + ",type=" + 1 + ",sourceConnectorName=" + sourceConnectorName + ",connectorUrl=" + connectorUrl + "的sourceConnector,添加新的");
dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName()); //source connector name dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数 String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数
Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr); Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
...@@ -465,6 +466,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -465,6 +466,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody.setConnectorJobId(connectorJobId); saveBody.setConnectorJobId(connectorJobId);
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr); saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
saveBody.setCreateTime(new Date()); saveBody.setCreateTime(new Date());
saveBody.setCrePerson(SessionUtils.getCurrentUserId());
dmpRealtimeSyncInfoDao.insert(saveBody); dmpRealtimeSyncInfoDao.insert(saveBody);
realtiemId = Long.valueOf(saveBody.getId()); realtiemId = Long.valueOf(saveBody.getId());
logger.info("###################保存实时同步任务--结束 ################"); logger.info("###################保存实时同步任务--结束 ################");
...@@ -496,7 +498,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -496,7 +498,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
StringWriter stringWriter = new StringWriter(); StringWriter stringWriter = new StringWriter();
try { try {
Template template = freeMarkerConfig.getConfiguration().getTemplate("realtime-sync-connector/" + type + "_connector.ftl"); Template template = freeMarkerConfig.getConfiguration().getTemplate(type + "_connector.ftl");
if (template != null) { if (template != null) {
try { try {
template.process(dataModel, stringWriter); template.process(dataModel, stringWriter);
...@@ -539,10 +541,10 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -539,10 +541,10 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
* @author Bellamy * @author Bellamy
* @since 2021-01-08 * @since 2021-01-08
*/ */
private JsonResult submitNoSelectTable(Long realtimeId, Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) { private JsonResult submitNoSelectTable(Long realtimeId, Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception {
logger.info("###################处理已选择的表信息################"); logger.info("###################处理已选择的表信息################");
/*//源数据源到数据源同步connector信息 ----开始------ /* //源数据源到数据源同步connector信息 ----开始------
Map<String, String> tableToTableConfigArg = new HashMap<>(); Map<String, String> tableToTableConfigArg = new HashMap<>();
//默认配置参数 --{kudu master 的地址} 10.0.108.61,10.0.108.62 //默认配置参数 --{kudu master 的地址} 10.0.108.61,10.0.108.62
String endpoint = targetDbInfo.getEndpoint(); String endpoint = targetDbInfo.getEndpoint();
...@@ -573,16 +575,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -573,16 +575,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
tableToTableConfigArg.put("hdfsNamenodePrincipal", targetDbInfo.getAccessId()); tableToTableConfigArg.put("hdfsNamenodePrincipal", targetDbInfo.getAccessId());
tableToTableConfigArg.put("keytabLocation", targetDbInfo.getAccessKey()); tableToTableConfigArg.put("keytabLocation", targetDbInfo.getAccessKey());
} }
tableToTableConfigArg.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag")); tableToTableConfigArg.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag"));*/
//已选择的表信息 //已选择的表信息
List<Map<String, String>> tables = (List<Map<String, String>>) params.get("tables"); List<Map<String, String>> tables = (List<Map<String, String>>) params.get("tables");
if (tables.size() > 0 && tables != null) {
for (Map<String, String> str : tables) { for (Map<String, String> str : tables) {
str.put("targetDbNameAndTableName", targetDbInfo.getDbName() + "." + str.get("targetTableName")); //目标库和表名称
str.putAll(tableToTableConfigArg);
DmpRealtimeSyncSelectTable selectTable = new DmpRealtimeSyncSelectTable(); DmpRealtimeSyncSelectTable selectTable = new DmpRealtimeSyncSelectTable();
//selectTable.setCrePerson(); selectTable.setCrePerson(SessionUtils.getCurrentUserId());
selectTable.setCreTime(new Date()); selectTable.setCreTime(new Date());
selectTable.setSrcTableName(str.get("sourceTableName")); selectTable.setSrcTableName(str.get("sourceTableName"));
selectTable.setTargetTableName(str.get("targetTableName")); selectTable.setTargetTableName(str.get("targetTableName"));
...@@ -592,7 +592,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -592,7 +592,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
selectTable.setArithmetic(str.get("arithmetic")); selectTable.setArithmetic(str.get("arithmetic"));
dmpRealtimeSyncInfoDao.insertRealtimeSelectTable(selectTable); dmpRealtimeSyncInfoDao.insertRealtimeSelectTable(selectTable);
} }
logger.info("###################处理已选择的表信息--结束################");*/ }
logger.info("###################处理已选择的表信息--结束################");
return new JsonResult(); return new JsonResult();
} }
} }
\ No newline at end of file
...@@ -375,7 +375,7 @@ ...@@ -375,7 +375,7 @@
ds.host, ds.host,
ds.port, ds.port,
ds.endpoint, ds.endpoint,
ds,access_key as accessKey, ds.access_key as accessKey,
ds.access_id as accessId, ds.access_id as accessId,
dsdt.datasource_type as datasourceTypeName, dsdt.datasource_type as datasourceTypeName,
dsdt.driver_class_name as driverName dsdt.driver_class_name as driverName
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment