Commit 971994dc authored by mcb's avatar mcb

no message

parent 3eb74037
......@@ -5,6 +5,7 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import com.jz.dmp.modules.model.DmpRealtimeSyncSelectTable;
import com.jz.dmp.modules.model.RealTimeSyncDataSourceModel;
import org.apache.ibatis.annotations.Param;
......@@ -131,4 +132,22 @@ public interface DmpRealtimeSyncInfoDao {
Map queryBlackTableByDataSourceId(@Param("srcDatasourceId") String srcDatasourceId);
Map selecltRealtimeSyncInfoByParams(Map<String, Object> queryParam);
/**
* 添加实时任务黑名单表
*
* @return
* @author Bellamy
* @since 2021-01-11
*/
int insertRealtimeBlackList(Map blacklist);
/**
* 新增实时任务已选择的表信息
*
* @return
* @author Bellamy
* @since 2021-01-11
*/
int insertRealtimeSelectTable(DmpRealtimeSyncSelectTable selectTable);
}
\ No newline at end of file
package com.jz.dmp.modules.model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Date;
/**
* 实时同步任务已选择的表信息(DmpRealtimeSyncSelectTable)实体类
*
* @author Bellamy
* @since 2021-01-11 13:06:53
*/
@ApiModel(value = "实时同步任务已选择的表信息", description = "实时同步任务已选择的表信息")
public class DmpRealtimeSyncSelectTable implements Serializable {
private static final long serialVersionUID = -87562703941786967L;
/**
* selectTableId
*/
@ApiModelProperty(value = "selectTableId")
private Long selectTableId;
/**
* 实时任务id
*/
@ApiModelProperty(value = "实时任务id")
private Long realtimeId;
/**
* 脱敏字段
*/
@ApiModelProperty(value = "脱敏字段")
private String desensitizationField;
/**
* 加密算法
*/
@ApiModelProperty(value = "加密算法")
private String arithmetic;
/**
* 主键名称
*/
@ApiModelProperty(value = "主键名称")
private String pkName;
/**
* 源数据表名称
*/
@ApiModelProperty(value = "源数据表名称")
private String srcTableName;
/**
* 目标数据表名称
*/
@ApiModelProperty(value = "目标数据表名称")
private String targetTableName;
/**
* 创建人
*/
@ApiModelProperty(value = "创建人")
private String crePerson;
/**
* 创建时间
*/
@ApiModelProperty(value = "创建时间")
private Date creTime;
/**
* 更新时间
*/
@ApiModelProperty(value = "更新时间")
private Date uptTime;
/**
* 更新人
*/
@ApiModelProperty(value = "更新人")
private String uptPerson;
public Long getSelectTableId() {
return selectTableId;
}
public void setSelectTableId(Long selectTableId) {
this.selectTableId = selectTableId;
}
public Long getRealtimeId() {
return realtimeId;
}
public void setRealtimeId(Long realtimeId) {
this.realtimeId = realtimeId;
}
public String getDesensitizationField() {
return desensitizationField;
}
public void setDesensitizationField(String desensitizationField) {
this.desensitizationField = desensitizationField;
}
public String getArithmetic() {
return arithmetic;
}
public void setArithmetic(String arithmetic) {
this.arithmetic = arithmetic;
}
public String getPkName() {
return pkName;
}
public void setPkName(String pkName) {
this.pkName = pkName;
}
public String getSrcTableName() {
return srcTableName;
}
public void setSrcTableName(String srcTableName) {
this.srcTableName = srcTableName;
}
public String getTargetTableName() {
return targetTableName;
}
public void setTargetTableName(String targetTableName) {
this.targetTableName = targetTableName;
}
public String getCrePerson() {
return crePerson;
}
public void setCrePerson(String crePerson) {
this.crePerson = crePerson;
}
public Date getCreTime() {
return creTime;
}
public void setCreTime(Date creTime) {
this.creTime = creTime;
}
public Date getUptTime() {
return uptTime;
}
public void setUptTime(Date uptTime) {
this.uptTime = uptTime;
}
public String getUptPerson() {
return uptPerson;
}
public void setUptPerson(String uptPerson) {
this.uptPerson = uptPerson;
}
}
\ No newline at end of file
......@@ -72,6 +72,13 @@ public class RealTimeSyncDataSourceModel {
*/
private String dataSourceTypeName;
private String accessId;
private String accessKey;
/*
* 终端信息
* */
private String endpoint;
public Integer getDatasourceType() {
return datasourceType;
......@@ -176,4 +183,28 @@ public class RealTimeSyncDataSourceModel {
public void setDatasourceDesc(String datasourceDesc) {
this.datasourceDesc = datasourceDesc;
}
public String getAccessId() {
return accessId;
}
public void setAccessId(String accessId) {
this.accessId = accessId;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getEndpoint() {
return endpoint;
}
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
}
......@@ -305,7 +305,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
DmpProjectSystemInfo dmpProjectSystemInfo = dmpProjectDao.queryProjectSystemInfo(projectId);
params.put("connectorSecurityFlag", dmpProjectSystemInfo.getKerberosIsenable()); //安全验证开关,是否启用KERBEROS
//保存实时同步任务处理数
/* //保存实时同步任务处理数
DmpRealtimeSyncHandleCount countModel = new DmpRealtimeSyncHandleCount();
countModel.setUuid(UUID.randomUUID().toString());
countModel.setToSubmit(1);
......@@ -316,7 +316,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
countModel.setToSubmit(countModel.getToSubmit() + tables.size());
}
dmpRealtimeSyncHandleCountDao.insert(countModel);
logger.info("###################添加实时同步任务处理数结束{}" + countModel.toString() + "############");
logger.info("###################添加实时同步任务处理数结束{}" + countModel.toString() + "############");*/
//connect1@http://172.18.104.130:9993/connectors
if (StringUtils.isNotEmpty(connectorUrl)) {
......@@ -325,16 +325,17 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
}
//提交源到源的connector
Integer datasource2DatasourceSyncId = submitDatasource2DatasourceToConnector(countModel.getUuid(), projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
Long realtimeId = submitDatasource2DatasourceToConnector(projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
//处理已选择的表信息
submitNoSelectTable(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
return false;
}
private Integer submitDatasource2DatasourceToConnector(String uuid, Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) {
private Long submitDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) {
Integer srcDataSourceId = sourceDbInfo.getId();
Integer targetDataSourceId = targetDbInfo.getId();
//保存实时同步任务提交结果
/* //保存实时同步任务提交结果
DmpRealtimeSyncSubmitResult dmpRealtimeSyncSubmitResult = new DmpRealtimeSyncSubmitResult();
dmpRealtimeSyncSubmitResult.setUuid(uuid);
dmpRealtimeSyncSubmitResult.setStatus(2);
......@@ -350,12 +351,11 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
handleCount.setSubmiting(handleCount.getSubmiting() + 1);
dmpRealtimeSyncHandleCountDao.update(handleCount);
logger.info("###################修改实时同步任务处理数--结束{}" + handleCount.toString() + "############");
}
}*/
//同步任务id
Integer id = null;
logger.info("同步数据源到数据源任务开始---");
//源数据源到数据源同步connector信息 ----开始------
logger.info("################### 开始--同步数据源到数据源任务################### ");
Long realtiemId = null; //同步任务id
//源数据源到数据源同步connector信息
//黑名单表
String blacklistTables = "";
String connectorBlacklistTables = "";
......@@ -381,7 +381,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
logger.info("################### 解析黑名单表--结束" + connectorBlacklistTables + " ################");
//白名单
//解析已选择
String connectorWhitelistTables = "";
int whitelistTablesConut = 0;
if (params.containsKey("tables")) {
......@@ -400,7 +400,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
}
}
logger.info("################### 解析白名单表--结束 ################");
logger.info("################### 解析已选择表--结束 ################");
Map<String, String> dataModelMap = new HashMap<>();
......@@ -409,8 +409,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("blacklistTables", connectorBlacklistTables); //设置的黑名单表
dataModelMap.put("blacklistTableCount", String.valueOf(blacklistTableCount)); //黑名单表数量
//设置的白名单表 在模板里进行判比较黑名单表和白名单表的数量,谁小就用谁
dataModelMap.put("whitelistTablesConut", String.valueOf(whitelistTablesConut)); //白名单表数量
dataModelMap.put("connectorWhitelistTables", connectorWhitelistTables); //设置的白名单
dataModelMap.put("whitelistTablesConut", String.valueOf(whitelistTablesConut)); //已选择表数量
dataModelMap.put("connectorWhitelistTables", connectorWhitelistTables); //设置的已选择
//选择的来源数据信息
dataModelMap.put("dbHost", sourceDbInfo.getHost());
dataModelMap.put("dbPort", sourceDbInfo.getPort());
......@@ -429,34 +429,31 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("topic", topic);
String sourceConnectorName = "debezium-connector-" + sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName();
// 自定义的sourceConnector参数 替换模板里的参数
Map sourceCustomArgMap = (Map) params.get("sourceCustomArg");
//查询同步任务信息表是否存在类型是数据源到数据源,srcDataSourceId,targetDataSourceId一样的信息 如果 存在就不发起请求
Map<String, Object> queryParam = new HashMap<>();
queryParam.put("srcDataSourceId", srcDataSourceId);
queryParam.put("sourceConnectorName", sourceConnectorName);
Map dataMap = dmpRealtimeSyncInfoDao.selecltRealtimeSyncInfoByParams(queryParam);
if (dataMap.size() == 0 && dataMap.isEmpty()) { //不存在同一个数据源的同步任务 没查到信息
logger.info("######不存在同一个数据源的同步任务srcDataSourceId="+srcDataSourceId+",type="+1+",sourceConnectorName="+sourceConnectorName+",connectorUrl="+connectorUrl+"的sourceConnector,添加新的");
dataModelMap.put("name", sourceDbInfo.getDatasourceName()+"_"+ sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
//不存在同一个数据源的同步任务 没查到信息
if (dataMap.size() == 0 && dataMap.isEmpty()) {
logger.info("######不存在同一个数据源的同步任务srcDataSourceId=" + srcDataSourceId + ",type=" + 1 + ",sourceConnectorName=" + sourceConnectorName + ",connectorUrl=" + connectorUrl + "的sourceConnector,添加新的");
dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数
Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
//请求接口正常
//请求接口正常则保存数据,否则失败
if (StringUtils.isNotEmpty(connectorJobId)) {
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
saveBody.setSrcDatasourceId(srcDataSourceId);
saveBody.setSrcDatasourceName(sourceDbInfo.getDatasourceName());
saveBody.setSrcDatabaseType(sourceDbInfo.getDataSourceTypeName());
saveBody.setSrcDatabaseName(sourceDbInfo.getDbName());
saveBody.setTargetDatasourceId(targetDataSourceId);
saveBody.setTargetDatasourceName(targetDbInfo.getDatasourceName());
saveBody.setTargetDatabaseType(targetDbInfo.getDataSourceTypeName());
saveBody.setTargetDatabaseName(targetDbInfo.getDbName());
saveBody.setType(1);
saveBody.setStatus("SUBMIT_SUCCESS");
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
......@@ -467,31 +464,25 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody.setTargetTypeName("Kafka");
saveBody.setConnectorJobId(connectorJobId);
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
/* insertDmpRealtimeSyncInfoModel(saveBody);
//操作数据源黑名单信息
Map<String, Object> backlistTablesByDatasourceMap = dmpRealtimeSyncInfoDao.getBacklistTablesByDatasourceId(srcDataSourceId);
if (CollectionUtils.isEmpty(backlistTablesByDatasourceMap)) {
dmpRealtimeSyncInfoDao.addBacklistTablesByDatasourceId(blacklistTables, srcDataSourceId);
saveBody.setCreateTime(new Date());
dmpRealtimeSyncInfoDao.insert(saveBody);
realtiemId = Long.valueOf(saveBody.getId());
logger.info("###################保存实时同步任务--结束 ################");
Map blacklist = new HashMap();
blacklist.put("creTime", new Date());
blacklist.put("realtimeId", saveBody.getId());
blacklist.put("datasourceId", srcDataSourceId);
blacklist.put("blacklistTable", blacklistTables);
dmpRealtimeSyncInfoDao.insertRealtimeBlackList(blacklist);
logger.info("###################保存实时同步黑名单数据--结束 ################");
} else {
dmpRealtimeSyncInfoDao.updateBacklistTablesByDatasourceId(blacklistTables, srcDataSourceId);
}
//更新对应状态的数
DmpRealtimeSyncHandleCountModel dmpRealtimeSyncHandleCountModelSuccess = dmpRealtimeSyncHandleCountDao.getDmpRealtimeSyncHandleCountModel(uuid);
if (dmpRealtimeSyncHandleCountModelSuccess != null) {
dmpRealtimeSyncHandleCountModelSuccess.setSubmitSuccess(dmpRealtimeSyncHandleCountModelSuccess.getSubmitSuccess() + 1);
dmpRealtimeSyncHandleCountModelSuccess.setSubmiting(dmpRealtimeSyncHandleCountModelSuccess.getSubmiting() - 1);
dmpRealtimeSyncHandleCountDao.updateDmpRealtimeSyncHandleCountModel(dmpRealtimeSyncHandleCountModelSuccess);
}
//保存提交结果
dmpRealtimeSyncSubmitResutModel.setSubmitResult(db2Db + " 提交成功");
dmpRealtimeSyncSubmitResutModel.setStatus(3);
dmpRealtimeSyncSubmitResutDao.addDmpRealtimeSyncSubmitResutModel(dmpRealtimeSyncSubmitResutModel);
id = dmpRealtimeSyncInfoModel.getId();*/
throw new RuntimeException("提交失败!");
}
} else {
throw new RuntimeException("存在相同的实时任务数据!");
}
return 1;
return realtiemId;
}
/**
......@@ -540,4 +531,68 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
return (String) resultMap.get("name");
}
/**
* 处理已选择的表信息
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
private JsonResult submitNoSelectTable(Long realtimeId, Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) {
logger.info("###################处理已选择的表信息################");
/*//源数据源到数据源同步connector信息 ----开始------
Map<String, String> tableToTableConfigArg = new HashMap<>();
//默认配置参数 --{kudu master 的地址} 10.0.108.61,10.0.108.62
String endpoint = targetDbInfo.getEndpoint();
//--{share key} CRNET_SECRET_KEY
String shareKey = (String) getDictionaryParamValueByKey("CRNET_SECRET_KEY");
tableToTableConfigArg.put("shareKey", shareKey);
//--{impala 地址} 10.0.108.62:21050
//目标数据源类型
String storeType = targetDbInfo.getDataSourceTypeName().toLowerCase();
String jdbcUrl = targetDbInfo.getJdbcUrl();
if ("kudu".equals(storeType)) {
tableToTableConfigArg.put("kuduMaster", endpoint);
String impalaUrl = jdbcUrl;
if (!org.springframework.util.StringUtils.isEmpty(impalaUrl)) {
impalaUrl = impalaUrl.replace("jdbc:impala://", "");
if (impalaUrl.contains("/")) {
impalaUrl = impalaUrl.substring(0, impalaUrl.indexOf("/"));
}
tableToTableConfigArg.put("impalaUrl", impalaUrl);
}
tableToTableConfigArg.put("impalaFqdn", targetDbInfo.getAccessId());
//--{schem registry url} http://10.0.108.61:9881
tableToTableConfigArg.put("registryUrl", dmpProjectSystemInfo.getKafkaSchemaRegisterUrl());
} else if ("hdfs".equals(storeType)) {
tableToTableConfigArg.put("jdbcUrl", jdbcUrl);
tableToTableConfigArg.put("databaseName", targetDbInfo.getDbName());
tableToTableConfigArg.put("hiveMetastoreUris", endpoint);
tableToTableConfigArg.put("hdfsNamenodePrincipal", targetDbInfo.getAccessId());
tableToTableConfigArg.put("keytabLocation", targetDbInfo.getAccessKey());
}
tableToTableConfigArg.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag"));
//已选择的表信息
List<Map<String, String>> tables = (List<Map<String, String>>) params.get("tables");
for (Map<String, String> str : tables) {
str.put("targetDbNameAndTableName", targetDbInfo.getDbName() + "." + str.get("targetTableName")); //目标库和表名称
str.putAll(tableToTableConfigArg);
DmpRealtimeSyncSelectTable selectTable = new DmpRealtimeSyncSelectTable();
//selectTable.setCrePerson();
selectTable.setCreTime(new Date());
selectTable.setSrcTableName(str.get("sourceTableName"));
selectTable.setTargetTableName(str.get("targetTableName"));
selectTable.setRealtimeId(realtimeId);
selectTable.setDesensitizationField(str.get("desensitizationField"));
selectTable.setPkName(str.get("pkName"));
selectTable.setArithmetic(str.get("arithmetic"));
dmpRealtimeSyncInfoDao.insertRealtimeSelectTable(selectTable);
}
logger.info("###################处理已选择的表信息--结束################");*/
return new JsonResult();
}
}
\ No newline at end of file
......@@ -374,6 +374,9 @@
ds.db_name as dbName,
ds.host,
ds.port,
ds.endpoint,
ds,access_key as accessKey,
ds.access_id as accessId,
dsdt.datasource_type as datasourceTypeName,
dsdt.driver_class_name as driverName
from dmp_syncing_datasource ds
......@@ -410,7 +413,6 @@
t1.tree_id as treeId,
t2.name as treeName,
t1.status,
date_format(t1.update_time,'%Y-%m-%d %H:%i:%s') as updateTime,
t1.src_datasource_id as srcDatasourceId,
t1.src_datasource_name as srcDatasourceName,
t1.src_database_type as srcDatabaseType,
......@@ -419,19 +421,26 @@
t1.target_database_type as targetDatabaseType
FROM dmp_realtime_sync_info t1
left join dmp_navigation_tree t2 on t1.tree_id=t2.id
left join dmp_syncing_datasource t3 ON t1.src_datasource_id = t3.ID
left join dmp_syncing_datasource t4 ON t1.target_datasource_id = t4.ID
where 1=1
<if test="projectId !=null">and t1.project_id=#{projectId}</if>
<if test="taskStatus != null and taskStatus != '' "> AND t1.status = #{taskStatus} </if>
<if test="treeId != null and treeId != '' "> AND t1.id = #{treeId} </if>
<if test="targetDatabaseTypeId != null and targetDatabaseTypeId != '' "> AND t4.DATASOURCE_TYPE = #{targetDatabaseTypeId} </if>
<if test="sourceDatabaseTypeId != null and sourceDatabaseTypeId != ''"> AND t3.DATASOURCE_TYPE = #{sourceDatabaseTypeId} </if>
<if test="sourceDatabaseName != null and sourceDatabaseName != '' "> AND t1.src_datasource_name like CONCAT('%', #{sourceDatabaseName}, '%') </if>
<if test="targetDatabaseName != null and targetDatabaseName != '' "> AND t1.target_datasource_name like CONCAT('%', #{targetDatabaseName}, '%') </if>
<if test="treeName != null and treeName != '' "> AND t2.name like CONCAT('%', #{treeName}, '%') </if>
<if test="sourceConnectorName != null "> AND t1.connector_job_id = #{sourceConnectorName} </if>
<if test="srcDataSourceId != null"> AND t1.src_datasource_id = #{srcDataSourceId}</if>
order by t1.create_time desc
</select>
<!--添加实时任务黑名单表-->
<insert id="insertRealtimeBlackList" parameterType="java.util.Map">
insert into dmp_realtime_sync_blacklist_table_info (realtime_id,datasource_id,blacklist_table,cre_time,cre_person)
values(#{realtimeId},#{datasourceId},#{blacklistTable},#{creTime},#{crePerson})
</insert>
<!--新增实时任务已选择的表信息-->
<insert id="insertRealtimeSelectTable" parameterType="com.jz.dmp.modules.model.DmpRealtimeSyncSelectTable">
insert into dmp_realtime_sync_select_table (realtime_id,desensitization_field,arithmetic,pk_name,src_table_name,target_table_name,cre_time,cre_person)
values(#{realtimeId},#{desensitizationField},#{arithmetic},#{pkName},#{srcTableName},#{targetTableName},#{creTime},#{crePerson})
</insert>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment