Commit c5ad9296 authored by sml's avatar sml

Merge branch 'dmp_dev' of http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-service.git into dmp_dev

parents 6c99dd9d 527af8d5
package com.jz.common.utils.realTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.*;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.client.RestTemplate;
import java.nio.charset.Charset;
import java.util.Map;
/**
* 使用RestTemplate发送HTTP请求
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
public class RestClient {
public static final Logger LOGGER = LoggerFactory.getLogger(RestClient.class);
/**
*
* @param connetionTimeOut
* @param socketTimeOut
* @return
*/
public static RestTemplate getRestTemplate() {
HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
requestFactory.setConnectTimeout(120 * 1000);
requestFactory.setReadTimeout(480 * 1000);
RestTemplate restTemplate = new RestTemplate(requestFactory);
return restTemplate;
}
/**
* 使用RestTemplate发送HTTP POST请求
* @param url 请求地址
* @param jsonStr 请求参数
* @param connectionTimeout 连接超时时间
* @param socketTimeout socket超时时间
* @return Map 返回报文
*/
@SuppressWarnings({ "unchecked" })
public static Map<String, Object> post(String url,String jsonStr){
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", "application/json;charset=UTF-8");//解决请求乱码问题
Map<String, Object> resutMap = null ;
try {
RestTemplate restTemplate = getRestTemplate();
resutMap = restTemplate.postForObject(url, new HttpEntity<String>(jsonStr, headers), Map.class);
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("rest post 异常",e.getMessage(),e);
}
return resutMap;
}
/**
* 使用RestTemplate发送HTTP get请求
* @param url
* @return
*/
@SuppressWarnings("unchecked")
public static Map<String, Object> get(String url){
RestTemplate restTemplate = getRestTemplate();
Map<String,Object> map = null;
try {
map = restTemplate.getForObject(url, Map.class);
} catch (Exception e) {
LOGGER.error("rest get 异常",e.getMessage(),e);
}
return map;
}
/**
* 使用RestTemplate发送HTTP put请求
* @param url
* @param jsonStr
*/
public static Map<String, Object> put(String url,String jsonStr){
Map<String, Object> map = null ;
try {
map = exchange(url, HttpMethod.PUT, jsonStr);
} catch (Exception e) {
LOGGER.error("rest put 异常",e.getMessage(),e);
}
return map ;
}
/**
* 使用RestTemplate发送HTTP delete 请求
* @param url
* @param jsonStr
*/
public static void delete(String url) {
try {
RestTemplate restTemplate = getRestTemplate();
restTemplate.delete(url);
} catch (Exception e) {
LOGGER.error("rest delete 异常",e.getMessage(),e);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Map<String, Object> exchange(String url, HttpMethod method, String jsonStr) {
// 请求头
HttpHeaders headers = new HttpHeaders();
MimeType mimeType = MimeTypeUtils.parseMimeType("application/json");
MediaType mediaType = new MediaType(mimeType.getType(), mimeType.getSubtype(), Charset.forName("UTF-8"));
// 请求体
headers.setContentType(mediaType);
// 发送请求
HttpEntity<String> entity = new HttpEntity<>(jsonStr, headers);
ResponseEntity<Map> resultEntity = getRestTemplate().exchange(url, method, entity, Map.class);
return resultEntity.getBody();
}
public static void main(String[] args) {
String url = "http://10.0.53.177:9883/connectors";
post(url, jsonStr);
}
static String jsonStr = "{\r\n" +
" \"name\": \"debezium-connector-avtiviti-process-center-1\",\r\n" +
" \"config\": {\r\n" +
" \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\",\r\n" +
" \"database.history.producer.sasl.kerberos.service.name\": \"kafka\",\r\n" +
" \"tasks.max\": \"3\",\r\n" +
" \"database.history.kafka.topic\": \"avtiviti.process-center.databasehistory\",\r\n" +
" \"column.blacklist\": \"\",\r\n" +
" \"transforms\": \"unwrap\",\r\n" +
" \"database.history.consumer.security.protocol\": \"SASL_PLAINTEXT\",\r\n" +
" \"database.history.consumer.sasl.kerberos.service.name\": \"kafka\",\r\n" +
" \"table.blacklist\": \"\",\r\n" +
" \"transforms.unwrap.drop.tombstones\": \"true\",\r\n" +
" \"transforms.unwrap.type\": \"io.debezium.transforms.UnwrapFromEnvelope\",\r\n" +
" \"value.converter\": \"io.confluent.connect.avro.AvroConverter\",\r\n" +
" \"database.whitelist\": \"process-center\",\r\n" +
" \"key.converter\": \"io.confluent.connect.avro.AvroConverter\",\r\n" +
" \"database.history.producer.sasl.mechanism\": \"GSSAPI\",\r\n" +
" \"database.user\": \"root\",\r\n" +
" \"database.server.id\": \"20\",\r\n" +
" \"database.history.producer.security.protocol\": \"SASL_PLAINTEXT\",\r\n" +
" \"database.history.kafka.bootstrap.servers\": \"SASL_PLAINTEXT://10.0.108.61:9092,SASL_PLAINTEXT://10.0.108.62:9092,SASL_PLAINTEXT://10.0.108.63:9092\",\r\n" +
" \"database.server.name\": \"avtiviti\",\r\n" +
" \"database.port\": \"3306\",\r\n" +
" \"value.converter.schema.registry.url\": \"http://10.0.108.61:9881\",\r\n" +
" \"database.hostname\": \"10.0.53.179\",\r\n" +
" \"database.password\": \"jz@2018\",\r\n" +
" \"database.history.consumer.sasl.mechanism\": \"GSSAPI\",\r\n" +
" \"key.converter.schema.registry.url\": \"http://10.0.108.61:9881\"\r\n" +
" }\r\n" +
"}";
}
......@@ -15,6 +15,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -24,6 +25,7 @@ import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.Map;
/**
* 实时同步任务
......@@ -92,7 +94,8 @@ public class RealTimeSyncController {
System.out.println(srcTopicName);
logger.info("############正常执行表数据id........" + ids[i]);
String shellPath = "/app/bigdata-app/scripts/trigger_straming.sh";
CmdUtils.callShell(shellPath, srcTopicName);
boolean flag = CmdUtils.callShell(shellPath, srcTopicName);
logger.info("############" + flag);
}
}
return new JsonResult();
......@@ -205,4 +208,42 @@ public class RealTimeSyncController {
return jsonResult;
}
/**
* 保存实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
@ApiOperation(value = "保存实时同步任务", notes = "保存实时同步任务")
@PostMapping(value = "/addTask")
public JsonResult addTask(@RequestBody Map<String, Object> params, HttpServletRequest httpRequest) throws Exception {
logger.info("###################请求参数{}" + params.toString() + "############");
if (StringUtils.isEmpty(params.get("projectId").toString())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "项目id不能为空!");
}
if (StringUtils.isEmpty(params.get("srcDataSourceId").toString())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "来源数据源id不能为空!");
}
if (StringUtils.isEmpty(params.get("targetDataSourceId").toString())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "目标数据源id不能为空!");
}
boolean flag = false;
//异步提交
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
dmpRealtimeSyncInfoService.addRealTimeTask(params);
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();
return new JsonResult();
}
}
\ No newline at end of file
package com.jz.dmp.modules.dao;
import com.jz.dmp.modules.model.DmpRealtimeSyncHandleCount;
import com.jz.dmp.modules.model.DmpRealtimeSyncSubmitResult;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 实时同步任务处理数(DmpRealtimeSyncHandleCount)表数据库访问层
*
* @author Bellamy
* @since 2021-01-08 14:56:03
*/
public interface DmpRealtimeSyncHandleCountDao {
/**
* 通过ID查询单条数据
*
* @param uuid
* @return 实例对象
*/
DmpRealtimeSyncHandleCount queryById(@Param("uuid") String uuid);
/**
* 查询指定行数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
List<DmpRealtimeSyncHandleCount> queryAllByLimit(@Param("offset") int offset, @Param("limit") int limit);
/**
* 通过实体作为筛选条件查询
*
* @param dmpRealtimeSyncHandleCount 实例对象
* @return 对象列表
*/
List<DmpRealtimeSyncHandleCount> queryAll(DmpRealtimeSyncHandleCount dmpRealtimeSyncHandleCount);
/**
* 新增数据
*
* @param dmpRealtimeSyncHandleCount 实例对象
* @return 影响行数
*/
int insert(DmpRealtimeSyncHandleCount dmpRealtimeSyncHandleCount);
/**
* 批量新增数据(MyBatis原生foreach方法)
*
* @param entities List<DmpRealtimeSyncHandleCount> 实例对象列表
* @return 影响行数
*/
int insertBatch(@Param("entities") List<DmpRealtimeSyncHandleCount> entities);
/**
* 批量新增或按主键更新数据(MyBatis原生foreach方法)
*
* @param entities List<DmpRealtimeSyncHandleCount> 实例对象列表
* @return 影响行数
*/
int insertOrUpdateBatch(@Param("entities") List<DmpRealtimeSyncHandleCount> entities);
/**
* 修改数据
*
* @param dmpRealtimeSyncHandleCount 实例对象
* @return 影响行数
*/
int update(DmpRealtimeSyncHandleCount dmpRealtimeSyncHandleCount);
/**
* 通过主键删除数据
*
* @param 主键
* @return 影响行数
*/
int deleteById();
/**
* 保存实时同步任务提交结果
*
* @param dmpRealtimeSyncSubmitResult
* @return 影响行数
* @author Bellamy
* @since 2021-01-07
*/
int insertRealtimeSyncSubmitResult(DmpRealtimeSyncSubmitResult dmpRealtimeSyncSubmitResult);
}
\ No newline at end of file
......@@ -129,4 +129,6 @@ public interface DmpRealtimeSyncInfoDao {
* @since 2021-01-06
*/
Map queryBlackTableByDataSourceId(@Param("srcDatasourceId") String srcDatasourceId);
Map selecltRealtimeSyncInfoByParams(Map<String, Object> queryParam);
}
\ No newline at end of file
package com.jz.dmp.modules.model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Date;
/**
* 实时同步任务处理数(DmpRealtimeSyncHandleCount)实体类
*
* @author Bellamy
* @since 2021-01-08 14:56:02
*/
@ApiModel(value = "实时同步任务处理数", description = "实时同步任务处理数")
public class DmpRealtimeSyncHandleCount implements Serializable {
private static final long serialVersionUID = -29396966088167386L;
@ApiModelProperty(value = "handleCountId")
private Integer handleCountId;
@ApiModelProperty(value = "uuid")
private String uuid;
/**
* 待提交数
*/
@ApiModelProperty(value = "待提交数")
private Integer toSubmit;
/**
* 提交中的数
*/
@ApiModelProperty(value = "提交中的数")
private Integer submiting;
/**
* 提交成功的数
*/
@ApiModelProperty(value = "提交成功的数")
private Integer submitSuccess;
/**
* 提交失败的数
*/
@ApiModelProperty(value = "提交失败的数")
private Integer submitFail;
/**
* 创建时间
*/
@ApiModelProperty(value = "创建时间")
private Date creTime;
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public Integer getToSubmit() {
return toSubmit;
}
public void setToSubmit(Integer toSubmit) {
this.toSubmit = toSubmit;
}
public Integer getSubmiting() {
return submiting;
}
public void setSubmiting(Integer submiting) {
this.submiting = submiting;
}
public Integer getSubmitSuccess() {
return submitSuccess;
}
public void setSubmitSuccess(Integer submitSuccess) {
this.submitSuccess = submitSuccess;
}
public Integer getSubmitFail() {
return submitFail;
}
public void setSubmitFail(Integer submitFail) {
this.submitFail = submitFail;
}
public Date getCreTime() {
return creTime;
}
public void setCreTime(Date creTime) {
this.creTime = creTime;
}
public Integer getHandleCountId() {
return handleCountId;
}
public void setHandleCountId(Integer handleCountId) {
this.handleCountId = handleCountId;
}
}
\ No newline at end of file
package com.jz.dmp.modules.model;
import java.io.Serializable;
import java.util.Date;
/**
* 实时同步任务提交结果
*
* @author Bellamy
* @since 2021-01-05 14:18:00
*/
public class DmpRealtimeSyncSubmitResult implements Serializable {
private static final long serialVersionUID = 2075639970822903705L;
private Long submitResultId;
private String uuid;
/**
* 提交结果
*/
private String submitResult;
/**
* source:源 connector,tartget:目标connector
*/
private Integer type;
/**
* 创建时间
*/
private Date createdTime;
private String typeStr;
/**
* 处理状态:1 待提交,2:提交中,3:提交成功,4:提交失败
*/
private Integer status;
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public String getSubmitResult() {
return submitResult;
}
public void setSubmitResult(String submitResult) {
this.submitResult = submitResult;
}
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public String getTypeStr() {
return typeStr;
}
public void setTypeStr(String typeStr) {
this.typeStr = typeStr;
}
public Long getSubmitResultId() {
return submitResultId;
}
public void setSubmitResultId(Long submitResultId) {
this.submitResultId = submitResultId;
}
public Date getCreatedTime() {
return createdTime;
}
public void setCreatedTime(Date createdTime) {
this.createdTime = createdTime;
}
}
......@@ -10,6 +10,7 @@ import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import com.jz.dmp.modules.model.RealTimeSyncModel;
import java.util.List;
import java.util.Map;
/**
* 实时同步任务(DmpRealtimeSyncInfo)表服务接口
......@@ -102,4 +103,13 @@ public interface DmpRealtimeSyncInfoService {
JsonResult selectTargetDatasourceInfo(String projectId) throws Exception;
JsonResult queryRealTimeInfoByDataSourceId(String srcDatasourceId, String sourceTableName) throws Exception;
/**
* 保存实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
boolean addRealTimeTask(Map<String, Object> params) throws Exception;
}
\ No newline at end of file
......@@ -8,24 +8,35 @@ import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService;
import com.jz.common.utils.realTime.DBUtil;
import com.jz.common.utils.realTime.RestClient;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeTableInfoReq;
import com.jz.dmp.modules.dao.DmpProjectDao;
import com.jz.dmp.modules.dao.DmpRealtimeSyncHandleCountDao;
import com.jz.dmp.modules.dao.DmpRealtimeSyncInfoDao;
import com.jz.dmp.modules.model.*;
import com.jz.dmp.modules.service.DmpRealtimeSyncInfoService;
import freemarker.core.ParseException;
import freemarker.template.MalformedTemplateNameException;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import freemarker.template.TemplateNotFoundException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.io.StringWriter;
import java.util.*;
import java.util.regex.Pattern;
/**
......@@ -38,15 +49,23 @@ import java.util.regex.Pattern;
@Transactional
public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoService {
private static Logger logger = LoggerFactory.getLogger(DmpRealtimeSyncInfoServiceImpl.class);
@Value("${spring.public-key}")
private String publicKey;
@Autowired
private FreeMarkerConfig freeMarkerConfig;
@Autowired
private DmpRealtimeSyncInfoDao dmpRealtimeSyncInfoDao;
@Autowired
private DmpProjectDao dmpProjectDao;
@Autowired
private DmpRealtimeSyncHandleCountDao dmpRealtimeSyncHandleCountDao;
/**
* 通过ID查询单条数据
*
......@@ -253,4 +272,272 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
return new JsonResult(flag);
}
/**
* 保存实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
@Override
public boolean addRealTimeTask(Map<String, Object> params) throws Exception {
String srcDataSourceId = params.get("srcDataSourceId").toString(); // 来源id
String targetDataSourceId = (String) params.get("targetDataSourceId"); // 目标源id
Long projectId = Long.valueOf(params.get("projectId").toString()); //项目id
String connectorUrl = (String) params.get("connectorUrl"); //connectorUrl连接信息
//根据来源数据源id获取数据信息
RealTimeSyncDataSourceModel sourceDbInfo = dmpRealtimeSyncInfoDao.querygSourceDbInfoById(srcDataSourceId);
if (sourceDbInfo != null) {
throw new RuntimeException("来源数据设置信息不存在!");
}
sourceDbInfo.setPassword(new BaseService().decode(sourceDbInfo.getPassword(), publicKey));
//根据目标数据源id获取数据信息
RealTimeSyncDataSourceModel targetDbInfo = dmpRealtimeSyncInfoDao.querygSourceDbInfoById(targetDataSourceId);
if (targetDbInfo != null) {
throw new RuntimeException("目标数据源设置信息不存在!");
}
targetDbInfo.setPassword(new BaseService().decode(targetDbInfo.getPassword(), publicKey));
//根据projectId查询项目系统配置信息
DmpProjectSystemInfo dmpProjectSystemInfo = dmpProjectDao.queryProjectSystemInfo(projectId);
params.put("connectorSecurityFlag", dmpProjectSystemInfo.getKerberosIsenable()); //安全验证开关,是否启用KERBEROS
//保存实时同步任务处理数
DmpRealtimeSyncHandleCount countModel = new DmpRealtimeSyncHandleCount();
countModel.setUuid(UUID.randomUUID().toString());
countModel.setToSubmit(1);
countModel.setCreTime(new Date());
//初始化当前任务记录数
List<Map<String, String>> tables = (List<Map<String, String>>) params.get("tables");
if (!CollectionUtils.isEmpty(tables)) {
countModel.setToSubmit(countModel.getToSubmit() + tables.size());
}
dmpRealtimeSyncHandleCountDao.insert(countModel);
logger.info("###################添加实时同步任务处理数结束{}" + countModel.toString() + "############");
//connect1@http://172.18.104.130:9993/connectors
if (StringUtils.isNotEmpty(connectorUrl)) {
if (connectorUrl.contains("@")) {
connectorUrl = connectorUrl.split("@")[1];
}
}
//提交源到源的connector
Integer datasource2DatasourceSyncId = submitDatasource2DatasourceToConnector(countModel.getUuid(), projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
return false;
}
private Integer submitDatasource2DatasourceToConnector(String uuid, Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) {
Integer srcDataSourceId = sourceDbInfo.getId();
Integer targetDataSourceId = targetDbInfo.getId();
//保存实时同步任务提交结果
DmpRealtimeSyncSubmitResult dmpRealtimeSyncSubmitResult = new DmpRealtimeSyncSubmitResult();
dmpRealtimeSyncSubmitResult.setUuid(uuid);
dmpRealtimeSyncSubmitResult.setStatus(2);
dmpRealtimeSyncSubmitResult.setType(1);
dmpRealtimeSyncSubmitResult.setCreatedTime(new Date());
dmpRealtimeSyncSubmitResult.setSubmitResult(sourceDbInfo.getDbName() + "----->" + sourceDbInfo.getDbName() + " 提交中");
dmpRealtimeSyncHandleCountDao.insertRealtimeSyncSubmitResult(dmpRealtimeSyncSubmitResult);
logger.info("###################添加实时同步任务提交结果 结束{}" + dmpRealtimeSyncSubmitResult.toString() + "############");
DmpRealtimeSyncHandleCount handleCount = dmpRealtimeSyncHandleCountDao.queryById(uuid);
if (handleCount != null) {
handleCount.setToSubmit(handleCount.getToSubmit() - 1);
handleCount.setSubmiting(handleCount.getSubmiting() + 1);
dmpRealtimeSyncHandleCountDao.update(handleCount);
logger.info("###################修改实时同步任务处理数--结束{}" + handleCount.toString() + "############");
}
//同步任务id
Integer id = null;
logger.info("同步数据源到数据源任务开始---");
//源数据源到数据源同步connector信息 ----开始------
//黑名单表
String blacklistTables = "";
String connectorBlacklistTables = "";
int blacklistTableCount = 0;
if (params.containsKey("blacklistTables")) {
blacklistTables = (String) params.get("blacklistTables");
if (StringUtils.isNotEmpty(blacklistTables)) {
StringBuffer sb = new StringBuffer();
String[] blacklistTableArr = blacklistTables.split(",");
blacklistTableCount = blacklistTableArr.length;
if (blacklistTableArr != null && blacklistTableCount >= 1) {
for (String string : blacklistTableArr) {
sb.append(sourceDbInfo.getDbName());
sb.append(".");
sb.append(string);
sb.append(",");
}
}
if (sb.toString().contains(",")) {
connectorBlacklistTables = sb.toString().substring(0, sb.toString().length() - 1);
}
}
}
logger.info("################### 解析黑名单表--结束" + connectorBlacklistTables + " ################");
//白名单表
String connectorWhitelistTables = "";
int whitelistTablesConut = 0;
if (params.containsKey("tables")) {
List<Map<String, String>> tables = (List<Map<String, String>>) params.get("tables");
if (!CollectionUtils.isEmpty(tables)) {
whitelistTablesConut = tables.size();
StringBuffer sb = new StringBuffer();
for (Map<String, String> temp : tables) {
sb.append(sourceDbInfo.getDbName());
sb.append(".");
sb.append(temp.get("sourceTableName"));
sb.append(",");
}
if (sb.toString().contains(",")) {
connectorWhitelistTables = sb.toString().substring(0, sb.toString().length() - 1);
}
}
}
logger.info("################### 解析白名单表--结束 ################");
Map<String, String> dataModelMap = new HashMap<>();
dataModelMap.put("kafkaBootstrapServers", dmpProjectSystemInfo.getKafkaBootstrapServers());//kafka 地址
dataModelMap.put("registerUrl", dmpProjectSystemInfo.getKafkaSchemaRegisterUrl()); //kafka connector schema 注册地址
dataModelMap.put("blacklistTables", connectorBlacklistTables); //设置的黑名单表
dataModelMap.put("blacklistTableCount", String.valueOf(blacklistTableCount)); //黑名单表数量
//设置的白名单表 在模板里进行判比较黑名单表和白名单表的数量,谁小就用谁
dataModelMap.put("whitelistTablesConut", String.valueOf(whitelistTablesConut)); //白名单表数量
dataModelMap.put("connectorWhitelistTables", connectorWhitelistTables); //设置的白名单表
//选择的来源数据信息
dataModelMap.put("dbHost", sourceDbInfo.getHost());
dataModelMap.put("dbPort", sourceDbInfo.getPort());
dataModelMap.put("dbUserName", sourceDbInfo.getUserName());
dataModelMap.put("dbPassWord", sourceDbInfo.getPassword());
dataModelMap.put("dataSourceId", sourceDbInfo.getId().toString());
dataModelMap.put("datasourceName", sourceDbInfo.getDatasourceName());
dataModelMap.put("dbName", sourceDbInfo.getDbName());
dataModelMap.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag")); //安全验证开关,是否启用KERBEROS
//前端定义的sourceConnectorName前缀
String sourceName = (String) params.get("sourceName");
dataModelMap.put("sourceName", sourceName);
//source kafak topic
String topic = sourceDbInfo.getDatasourceName() + "_" + sourceName + "." + sourceDbInfo.getDbName() + ".databasehistory";
dataModelMap.put("topic", topic);
String sourceConnectorName = "debezium-connector-" + sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName();
// 自定义的sourceConnector参数 替换模板里的参数
Map sourceCustomArgMap = (Map) params.get("sourceCustomArg");
//查询同步任务信息表是否存在类型是数据源到数据源,srcDataSourceId,targetDataSourceId一样的信息 如果 存在就不发起请求
Map<String, Object> queryParam = new HashMap<>();
queryParam.put("srcDataSourceId", srcDataSourceId);
queryParam.put("sourceConnectorName", sourceConnectorName);
Map dataMap = dmpRealtimeSyncInfoDao.selecltRealtimeSyncInfoByParams(queryParam);
if (dataMap.size() == 0 && dataMap.isEmpty()) { //不存在同一个数据源的同步任务 没查到信息
logger.info("######不存在同一个数据源的同步任务srcDataSourceId="+srcDataSourceId+",type="+1+",sourceConnectorName="+sourceConnectorName+",connectorUrl="+connectorUrl+"的sourceConnector,添加新的");
dataModelMap.put("name", sourceDbInfo.getDatasourceName()+"_"+ sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数
Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
//请求接口正常
if (StringUtils.isNotEmpty(connectorJobId)) {
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
saveBody.setSrcDatasourceId(srcDataSourceId);
saveBody.setSrcDatasourceName(sourceDbInfo.getDatasourceName());
saveBody.setSrcDatabaseType(sourceDbInfo.getDataSourceTypeName());
saveBody.setSrcDatabaseName(sourceDbInfo.getDbName());
saveBody.setTargetDatasourceId(targetDataSourceId);
saveBody.setTargetDatasourceName(targetDbInfo.getDatasourceName());
saveBody.setTargetDatabaseType(targetDbInfo.getDataSourceTypeName());
saveBody.setTargetDatabaseName(targetDbInfo.getDbName());
saveBody.setType(1);
saveBody.setStatus("SUBMIT_SUCCESS");
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
saveBody.setProjectId(projectId);
saveBody.setConnectorUrl(connectorUrl);
saveBody.setSrcTopicName(topic);
saveBody.setSourceTypeName(sourceDbInfo.getDataSourceTypeName());
saveBody.setTargetTypeName("Kafka");
saveBody.setConnectorJobId(connectorJobId);
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
/* insertDmpRealtimeSyncInfoModel(saveBody);
//操作数据源黑名单信息
Map<String, Object> backlistTablesByDatasourceMap = dmpRealtimeSyncInfoDao.getBacklistTablesByDatasourceId(srcDataSourceId);
if (CollectionUtils.isEmpty(backlistTablesByDatasourceMap)) {
dmpRealtimeSyncInfoDao.addBacklistTablesByDatasourceId(blacklistTables, srcDataSourceId);
} else {
dmpRealtimeSyncInfoDao.updateBacklistTablesByDatasourceId(blacklistTables, srcDataSourceId);
}
//更新对应状态的数
DmpRealtimeSyncHandleCountModel dmpRealtimeSyncHandleCountModelSuccess = dmpRealtimeSyncHandleCountDao.getDmpRealtimeSyncHandleCountModel(uuid);
if (dmpRealtimeSyncHandleCountModelSuccess != null) {
dmpRealtimeSyncHandleCountModelSuccess.setSubmitSuccess(dmpRealtimeSyncHandleCountModelSuccess.getSubmitSuccess() + 1);
dmpRealtimeSyncHandleCountModelSuccess.setSubmiting(dmpRealtimeSyncHandleCountModelSuccess.getSubmiting() - 1);
dmpRealtimeSyncHandleCountDao.updateDmpRealtimeSyncHandleCountModel(dmpRealtimeSyncHandleCountModelSuccess);
}
//保存提交结果
dmpRealtimeSyncSubmitResutModel.setSubmitResult(db2Db + " 提交成功");
dmpRealtimeSyncSubmitResutModel.setStatus(3);
dmpRealtimeSyncSubmitResutDao.addDmpRealtimeSyncSubmitResutModel(dmpRealtimeSyncSubmitResutModel);
id = dmpRealtimeSyncInfoModel.getId();*/
}
}
return 1;
}
/**
* 使用freemaker模板生成 kafka connector 请求参数
*
* @param type 模板类型
* @param dataModel 模板里定义的变量数据对象
* @return
*/
public String freemakerJson(String type, Map<String, String> dataModel) {
StringWriter stringWriter = new StringWriter();
try {
Template template = freeMarkerConfig.getConfiguration().getTemplate("realtime-sync-connector/" + type + "_connector.ftl");
if (template != null) {
try {
template.process(dataModel, stringWriter);
} catch (TemplateException e) {
e.printStackTrace();
}
}
} catch (TemplateNotFoundException e) {
e.printStackTrace();
} catch (MalformedTemplateNameException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return stringWriter.toString();
}
/**
* 根据返回结果 获取返回的connectorName
*
* @param resultMap
* @return
*/
public String getConnectorJobId(Map<String, Object> resultMap) {
if (CollectionUtils.isEmpty(resultMap)) {
return null;
}
if (!resultMap.containsKey("name")) {
return null;
}
return (String) resultMap.get("name");
}
}
\ No newline at end of file
......@@ -9,6 +9,7 @@ import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.enums.TestConnectStatusEnum;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService;
import com.jz.common.utils.JsonMapper;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceListDto;
......@@ -28,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
......@@ -47,6 +49,9 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
private static Logger logger = LoggerFactory.getLogger(DmpSyncingDatasourceServiceImpl.class);
@Value("${spring.public-key}")
private String publicKey;
@Autowired
private DmpSyncingDatasourceDao dmpSyncingDatasourceDao;
......@@ -84,6 +89,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
*
* @param dmpSyncingDatasource 实例对象
* @return 实例对象
* @author Bellamy
*/
@Override
public DmpSyncingDatasource insert(DmpSyncingDatasource dmpSyncingDatasource) {
......@@ -96,6 +102,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
*
* @param dmpSyncingDatasource 实例对象
* @return 实例对象
* @author Bellamy
*/
@Override
public DmpSyncingDatasource update(DmpSyncingDatasource dmpSyncingDatasource) {
......@@ -108,6 +115,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
*
* @param id 主键
* @return 是否成功
* @author Bellamy
*/
@Override
public boolean deleteById(Integer id) {
......@@ -118,6 +126,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 数据源列表查询
*
* @param req
* @author Bellamy
*/
@Override
public PageInfoResponse<DataSourceListDto> queryDataSourceListPage(DataSourceListReq req, HttpServletRequest httpRequest) throws Exception {
......@@ -138,6 +147,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 批量删除数据源
*
* @return
* @author Bellamy
*/
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
......@@ -157,6 +167,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 新增-获取数据源类型
*
* @return
* @author Bellamy
*/
@Override
public JsonResult queryGroupDatasourceType() throws Exception {
......@@ -186,6 +197,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 保存数据源
*
* @return
* @author Bellamy
*/
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
@Override
......@@ -198,6 +210,9 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
if (len > 0) {
return new JsonResult(ResultCode.PARAMS_ERROR, "数据源名称已存在");
}
//解析要保存的数据源信息
DmpSyncingDatasource dmpSyncingDatasource = getDataSourceInfo(saveBody);
DmpSyncingDatasource dsd = new DmpSyncingDatasource();
BeanUtils.copyProperties(saveBody, dsd);
......@@ -205,15 +220,65 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
dsd.setDatasourceType(Integer.valueOf(saveBody.getDatasourceType())); //数据源类型ID
dsd.setProjectId(Integer.valueOf(saveBody.getProjectId()));
dsd.setDataStatus("1");
dsd.setTestConnectStatus(TestConnectStatusEnum.WCS.getValue());
if (StringUtils.isEmpty(saveBody.getTestConnectStatus())) { //默认未测试
dsd.setTestConnectStatus(TestConnectStatusEnum.WCS.getValue());
}
dsd.setTestConnectStatus(saveBody.getTestConnectStatus());
dsd.setDbName(dmpSyncingDatasource.getDbName());
dsd.setHost(dmpSyncingDatasource.getHost());
dsd.setPort(dmpSyncingDatasource.getPort());
dsd.setPassword(dmpSyncingDatasource.getPassword());
dmpSyncingDatasourceDao.insert(dsd);
return new JsonResult(dsd);
}
/**
* 解析要保存的数据源信息
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
public DmpSyncingDatasource getDataSourceInfo(DmpSyncingDatasourceReq saveBody) {
DmpSyncingDatasource returnData = new DmpSyncingDatasource();
if (saveBody != null) {
String jdbcUrl = saveBody.getJdbcUrl();
if (!org.springframework.util.StringUtils.isEmpty(jdbcUrl)) {
jdbcUrl = jdbcUrl.substring(jdbcUrl.indexOf("//") + 2);
String hostPort = "";
String dbName = "";
if (jdbcUrl.contains("/")) {
hostPort = jdbcUrl.substring(0, jdbcUrl.indexOf("/"));
dbName = jdbcUrl.substring(jdbcUrl.indexOf("/") + 1);
}
if (dbName.contains("?")) {
dbName = dbName.substring(0, dbName.indexOf("?")); //数据库名称
}
returnData.setDbName(dbName);
String[] hostPortArr = hostPort.split(":");
if (hostPortArr != null && hostPortArr.length >= 2) {
String host = hostPortArr[0];
String port = hostPortArr[1];
returnData.setHost(host); //ip
returnData.setPort(port); //端口
}
}
//加密密码
String password = saveBody.getPassword();
password = new BaseService().encode(password, publicKey);
returnData.setPassword(password);
}
return returnData;
}
/**
* 测试连通性
*
* @return
* @author Bellamy
*/
@Override
public JsonResult testConnection(DmpSyncingDatasourceReq saveBody) throws Exception {
......@@ -238,6 +303,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 编辑数据源
*
* @return
* @author Bellamy
*/
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
......@@ -274,6 +340,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 获取数据源类型输入框属性
*
* @return
* @author Bellamy
*/
@Override
public JsonResult<SyncingDatasourceTypeDto> selectDatasourceTypeAttrById(String datasourceTypeId) throws Exception {
......@@ -311,6 +378,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
/*
* 查询数据源 对应的 数据库信息
* @author Bellamy
* */
private DmpAgentDatasourceInfo dsInfoDTO(DmpSyncingDatasourceReq body) throws Exception {
//数据源类型ID去查询
......
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jz.dmp.modules.dao.DmpRealtimeSyncHandleCountDao">
<resultMap type="com.jz.dmp.modules.model.DmpRealtimeSyncHandleCount" id="DmpRealtimeSyncHandleCountMap">
<result property="handleCountId" column="handle_count_id" jdbcType="INTEGER"/>
<result property="uuid" column="uuid" jdbcType="VARCHAR"/>
<result property="toSubmit" column="to_submit" jdbcType="INTEGER"/>
<result property="submiting" column="submiting" jdbcType="INTEGER"/>
<result property="submitSuccess" column="submit_success" jdbcType="INTEGER"/>
<result property="submitFail" column="submit_fail" jdbcType="INTEGER"/>
<result property="creTime" column="cre_time" jdbcType="TIMESTAMP"/>
</resultMap>
<!--查询单个-->
<select id="queryById" resultMap="DmpRealtimeSyncHandleCountMap">
select
handle_count_id, uuid, to_submit, submiting, submit_success, submit_fail, cre_time
from dmp_realtime_sync_handle_count
where uuid= #{uuid}
</select>
<!--查询指定行数据-->
<select id="queryAllByLimit" resultMap="DmpRealtimeSyncHandleCountMap">
select
uuid, to_submit, submiting, submit_success, submit_fail, cre_time
from dmp_realtime_sync_handle_count
limit #{offset}, #{limit}
</select>
<!--通过实体作为筛选条件查询-->
<select id="queryAll" resultMap="DmpRealtimeSyncHandleCountMap">
select
uuid, to_submit, submiting, submit_success, submit_fail, cre_time
from dmp_realtime_sync_handle_count
<where>
<if test="uuid != null and uuid != ''">
and uuid = #{uuid}
</if>
<if test="toSubmit != null">
and to_submit = #{toSubmit}
</if>
<if test="submiting != null">
and submiting = #{submiting}
</if>
<if test="submitSuccess != null">
and submit_success = #{submitSuccess}
</if>
<if test="submitFail != null">
and submit_fail = #{submitFail}
</if>
<if test="creTime != null">
and cre_time = #{creTime}
</if>
</where>
</select>
<!--新增所有列-->
<insert id="insert" keyProperty="handleCountId" useGeneratedKeys="true">
insert into dmp_realtime_sync_handle_count(uuid, to_submit, submiting, submit_success, submit_fail, cre_time)
values (#{uuid}, #{toSubmit}, #{submiting}, #{submitSuccess}, #{submitFail}, #{creTime})
</insert>
<insert id="insertBatch" keyProperty="handleCountId" useGeneratedKeys="true">
insert into dmp_realtime_sync_handle_count(uuid, to_submit, submiting, submit_success, submit_fail,
cre_time)
values
<foreach collection="entities" item="entity" separator=",">
(#{entity.uuid}, #{entity.toSubmit}, #{entity.submiting}, #{entity.submitSuccess}, #{entity.submitFail},
#{entity.creTime})
</foreach>
</insert>
<insert id="insertOrUpdateBatch" keyProperty="handleCountId" useGeneratedKeys="true">
insert into dmp_realtime_sync_handle_count(uuid, to_submit, submiting, submit_success, submit_fail,
cre_time)
values
<foreach collection="entities" item="entity" separator=",">
(#{entity.uuid}, #{entity.toSubmit}, #{entity.submiting}, #{entity.submitSuccess}, #{entity.submitFail},
#{entity.creTime})
</foreach>
on duplicate key update
uuid = values(uuid) , to_submit = values(to_submit) , submiting = values(submiting) , submit_success =
values(submit_success) , submit_fail = values(submit_fail) , cre_time = values(cre_time)
</insert>
<!--通过主键修改数据-->
<update id="update">
update dmp_realtime_sync_handle_count
<set>
<if test="toSubmit != null">
to_submit = #{toSubmit},
</if>
<if test="submiting != null">
submiting = #{submiting},
</if>
<if test="submitSuccess != null">
submit_success = #{submitSuccess},
</if>
<if test="submitFail != null">
submit_fail = #{submitFail},
</if>
</set>
where uuid = #{uuid}
</update>
<!--通过主键删除-->
<delete id="deleteById">
delete from dmp_realtime_sync_handle_count where = #{handleCountId}
</delete>
<!--保存实时同步任务提交结果-->
<insert id="insertRealtimeSyncSubmitResult" useGeneratedKeys="true" keyProperty="submitResultId" parameterType="com.jz.dmp.modules.model.DmpRealtimeSyncSubmitResult">
INSERT INTO dmp_realtime_sync_submit_result (`uuid`,`submit_result`,`type`,`created_time`,`status`)
VALUES(#{uuid},#{submitResult},#{type},createdTime,#{status})
</insert>
</mapper>
\ No newline at end of file
......@@ -372,6 +372,8 @@
ds.user_name as userName,
ds.password,
ds.db_name as dbName,
ds.host,
ds.port,
dsdt.datasource_type as datasourceTypeName,
dsdt.driver_class_name as driverName
from dmp_syncing_datasource ds
......@@ -401,4 +403,35 @@
FROM dmp_realtime_sync_blacklist_table_info
WHERE datasource_id = #{srcDatasourceId}
</select>
<select id="selecltRealtimeSyncInfoByParams" resultType="java.util.Map" parameterType="java.util.Map">
SELECT
t1.id,
t1.tree_id as treeId,
t2.name as treeName,
t1.status,
date_format(t1.update_time,'%Y-%m-%d %H:%i:%s') as updateTime,
t1.src_datasource_id as srcDatasourceId,
t1.src_datasource_name as srcDatasourceName,
t1.src_database_type as srcDatabaseType,
t1.target_datasource_id as targetDatasourceId,
t1.target_datasource_name as targetDatasourceName,
t1.target_database_type as targetDatabaseType
FROM dmp_realtime_sync_info t1
left join dmp_navigation_tree t2 on t1.tree_id=t2.id
left join dmp_syncing_datasource t3 ON t1.src_datasource_id = t3.ID
left join dmp_syncing_datasource t4 ON t1.target_datasource_id = t4.ID
where 1=1
<if test="projectId !=null">and t1.project_id=#{projectId}</if>
<if test="taskStatus != null and taskStatus != '' "> AND t1.status = #{taskStatus} </if>
<if test="treeId != null and treeId != '' "> AND t1.id = #{treeId} </if>
<if test="targetDatabaseTypeId != null and targetDatabaseTypeId != '' "> AND t4.DATASOURCE_TYPE = #{targetDatabaseTypeId} </if>
<if test="sourceDatabaseTypeId != null and sourceDatabaseTypeId != ''"> AND t3.DATASOURCE_TYPE = #{sourceDatabaseTypeId} </if>
<if test="sourceDatabaseName != null and sourceDatabaseName != '' "> AND t1.src_datasource_name like CONCAT('%', #{sourceDatabaseName}, '%') </if>
<if test="targetDatabaseName != null and targetDatabaseName != '' "> AND t1.target_datasource_name like CONCAT('%', #{targetDatabaseName}, '%') </if>
<if test="treeName != null and treeName != '' "> AND t2.name like CONCAT('%', #{treeName}, '%') </if>
<if test="sourceConnectorName != null "> AND t1.connector_job_id = #{sourceConnectorName} </if>
<if test="srcDataSourceId != null"> AND t1.src_datasource_id = #{srcDataSourceId}</if>
order by t1.create_time desc
</select>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment