Commit 880656a1 authored by mcb's avatar mcb

no message

parent 72d8be80
...@@ -140,7 +140,7 @@ public interface DmpRealtimeSyncInfoDao { ...@@ -140,7 +140,7 @@ public interface DmpRealtimeSyncInfoDao {
* @author Bellamy * @author Bellamy
* @since 2021-01-11 * @since 2021-01-11
*/ */
int insertRealtimeBlackList(Map blacklist); int insertRealtimeBlackList(Map blacklist) throws Exception;
/** /**
* 新增实时任务已选择的表信息 * 新增实时任务已选择的表信息
...@@ -149,5 +149,23 @@ public interface DmpRealtimeSyncInfoDao { ...@@ -149,5 +149,23 @@ public interface DmpRealtimeSyncInfoDao {
* @author Bellamy * @author Bellamy
* @since 2021-01-11 * @since 2021-01-11
*/ */
int insertRealtimeSelectTable(DmpRealtimeSyncSelectTable selectTable); int insertRealtimeSelectTable(DmpRealtimeSyncSelectTable selectTable) throws Exception;
/**
* 修改实时任务黑名单表
*
* @return
* @author Bellamy
* @since 2021-01-11
*/
int updateRealtimeBlackList(Map blacklist) throws Exception;
/**
* 修改实时任务已选择的表信息
*
* @return
* @author Bellamy
* @since 2021-01-11
*/
int updateRealtimeSelectTable(DmpRealtimeSyncSelectTable selectTable) throws Exception;
} }
\ No newline at end of file
...@@ -435,6 +435,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -435,6 +435,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
Map blacklist = new HashMap(); Map blacklist = new HashMap();
blacklist.put("creTime", new Date()); blacklist.put("creTime", new Date());
blacklist.put("realtimeId", saveBody.getId()); blacklist.put("realtimeId", saveBody.getId());
blacklist.put("crePerson", SessionUtils.getCurrentUserId());
blacklist.put("datasourceId", srcDataSourceId); blacklist.put("datasourceId", srcDataSourceId);
blacklist.put("blacklistTable", blacklistTablesInfo.get("blacklistTables").toString()); blacklist.put("blacklistTable", blacklistTablesInfo.get("blacklistTables").toString());
dmpRealtimeSyncInfoDao.insertRealtimeBlackList(blacklist); dmpRealtimeSyncInfoDao.insertRealtimeBlackList(blacklist);
...@@ -640,6 +641,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -640,6 +641,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
* @since 2021-01-08 * @since 2021-01-08
*/ */
@Override @Override
@Transactional(rollbackFor = Exception.class,propagation = Propagation.REQUIRES_NEW)
public JsonResult updateRealTimeTask(Map<String, Object> params) throws Exception { public JsonResult updateRealTimeTask(Map<String, Object> params) throws Exception {
String srcDataSourceId = params.get("srcDataSourceId").toString(); //来源id String srcDataSourceId = params.get("srcDataSourceId").toString(); //来源id
String targetDataSourceId = params.get("targetDataSourceId").toString(); //目标源id String targetDataSourceId = params.get("targetDataSourceId").toString(); //目标源id
...@@ -671,13 +673,119 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -671,13 +673,119 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
} }
} }
//提交源到源的connector //提交源到源的connector
Long realtimeId = updateDatasource2DatasourceToConnector(projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params); JsonResult realtimeId = updateDatasource2DatasourceToConnector(projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
//编辑 已选择表信息
updateNoSelectTable(params);
return new JsonResult(); return new JsonResult();
} }
private Long updateDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception { private JsonResult updateDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception {
Integer srcDataSourceId = sourceDbInfo.getId();
Integer targetDataSourceId = targetDbInfo.getId();
return null; String realtiemId = params.get("taskId").toString(); //同步任务id
//解析黑名单表
Map blacklistTablesInfo = getBlackListTableInfo(sourceDbInfo, params);
//解析已选择表
Map selectlistTablesInfo = getSelectListTableInfo(sourceDbInfo, params);
Map<String, String> dataModelMap = new HashMap<>();
dataModelMap.put("kafkaBootstrapServers", dmpProjectSystemInfo.getKafkaBootstrapServers());//kafka 地址
dataModelMap.put("registerUrl", dmpProjectSystemInfo.getKafkaSchemaRegisterUrl()); //kafka connector schema 注册地址
dataModelMap.put("blacklistTables", blacklistTablesInfo.get("connectorBlacklistTables").toString()); //设置的黑名单表
dataModelMap.put("blacklistTableCount", blacklistTablesInfo.get("blacklistTableCount").toString()); //黑名单表数量
//设置的白名单表 在模板里进行判比较黑名单表和白名单表的数量,谁小就用谁
dataModelMap.put("whitelistTablesConut", selectlistTablesInfo.get("whitelistTablesConut").toString()); //已选择表数量
dataModelMap.put("connectorWhitelistTables", selectlistTablesInfo.get("connectorWhitelistTables").toString()); //设置的已选择表
//选择的来源数据信息
dataModelMap.put("dbHost", sourceDbInfo.getHost());
dataModelMap.put("dbPort", sourceDbInfo.getPort());
dataModelMap.put("dbUserName", sourceDbInfo.getUserName());
dataModelMap.put("dbPassWord", sourceDbInfo.getPassword());
dataModelMap.put("dataSourceId", sourceDbInfo.getId().toString());
dataModelMap.put("datasourceName", sourceDbInfo.getDatasourceName());
dataModelMap.put("dbName", sourceDbInfo.getDbName());
dataModelMap.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag")); //安全验证开关,是否启用KERBEROS
//前端定义的sourceConnectorName前缀
String sourceName = (String) params.get("sourceName");
dataModelMap.put("sourceName", sourceName);
//source kafak topic
String topic = sourceDbInfo.getDatasourceName() + "_" + sourceName + "." + sourceDbInfo.getDbName() + ".databasehistory";
dataModelMap.put("topic", topic);
dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数
Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
//请求接口正常则保存数据,否则失败
//if (StringUtils.isNotEmpty(connectorJobId)) {
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
saveBody.setSrcDatasourceId(srcDataSourceId);
saveBody.setSrcDatasourceName(sourceDbInfo.getDatasourceName());
saveBody.setSrcDatabaseType(sourceDbInfo.getDataSourceTypeName());
saveBody.setSrcDatabaseName(sourceDbInfo.getDbName());
saveBody.setTargetDatasourceId(targetDataSourceId);
saveBody.setTargetDatasourceName(targetDbInfo.getDatasourceName());
saveBody.setTargetDatabaseType(targetDbInfo.getDataSourceTypeName());
saveBody.setTargetDatabaseName(targetDbInfo.getDbName());
saveBody.setStatus("SUBMIT_SUCCESS");
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
saveBody.setProjectId(projectId);
saveBody.setConnectorUrl(connectorUrl);
saveBody.setSrcTopicName(topic);
saveBody.setSourceTypeName(sourceDbInfo.getDataSourceTypeName());
saveBody.setTargetTypeName("Kafka");
saveBody.setConnectorJobId(connectorJobId);
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
saveBody.setUpdateTime(new Date());
saveBody.setId(Integer.valueOf(realtiemId));
//saveBody.setUptPerson(SessionUtils.getCurrentUserId());
dmpRealtimeSyncInfoDao.update(saveBody);
logger.info("###################修改实时同步任务--结束 ################");
Map blacklist = new HashMap();
blacklist.put("uptTime", new Date());
//blacklist.put("uptPerson", SessionUtils.getCurrentUserId());
blacklist.put("realtimeId", realtiemId);
blacklist.put("datasourceId", srcDataSourceId);
blacklist.put("blacklistTable", blacklistTablesInfo.get("blacklistTables").toString());
dmpRealtimeSyncInfoDao.updateRealtimeBlackList(blacklist);
logger.info("###################修改实时同步黑名单数据--结束 ################");
/* } else {
throw new RuntimeException("提交失败!");
}*/
return new JsonResult();
} }
/**
* 修改处理已选择的表信息
*
* @return
* @author Bellamy
* @since 2021-01-11
*/
private JsonResult updateNoSelectTable(Map<String, Object> params) throws Exception {
logger.info("###################处理已选择的表信息################");
String realtimeId = params.get("taskId").toString(); //同步任务id
//已选择的表信息
List<Map<String, String>> tables = (List<Map<String, String>>) params.get("tables");
if (tables.size() > 0 && tables != null) {
for (Map<String, String> str : tables) {
DmpRealtimeSyncSelectTable selectTable = new DmpRealtimeSyncSelectTable();
//selectTable.setUptPerson(SessionUtils.getCurrentUserId());
selectTable.setUptTime(new Date());
selectTable.setSrcTableName(str.get("sourceTableName"));
selectTable.setTargetTableName(str.get("targetTableName"));
selectTable.setRealtimeId(Long.valueOf(realtimeId));
selectTable.setDesensitizationField(str.get("desensitizationField"));
selectTable.setPkName(str.get("pkName"));
selectTable.setArithmetic(str.get("arithmetic"));
dmpRealtimeSyncInfoDao.updateRealtimeSelectTable(selectTable);
}
}
logger.info("###################处理已选择的表信息--结束################");
return new JsonResult();
}
} }
\ No newline at end of file
...@@ -443,4 +443,53 @@ ...@@ -443,4 +443,53 @@
insert into dmp_realtime_sync_select_table (realtime_id,desensitization_field,arithmetic,pk_name,src_table_name,target_table_name,cre_time,cre_person) insert into dmp_realtime_sync_select_table (realtime_id,desensitization_field,arithmetic,pk_name,src_table_name,target_table_name,cre_time,cre_person)
values(#{realtimeId},#{desensitizationField},#{arithmetic},#{pkName},#{srcTableName},#{targetTableName},#{creTime},#{crePerson}) values(#{realtimeId},#{desensitizationField},#{arithmetic},#{pkName},#{srcTableName},#{targetTableName},#{creTime},#{crePerson})
</insert> </insert>
<!--修改实时任务黑名单表-->
<update id="updateRealtimeBlackList" parameterType="java.util.Map">
update dmp_realtime_sync_blacklist_table_info
<set>
<if test="uptTime != null">
upt_time = #{uptTime},
</if>
<if test="uptPerson != null">
upt_person = #{uptPerson},
</if>
<if test="datasourceId != null">
datasource_id = #{datasourceId},
</if>
<if test="blacklistTable != null">
blacklist_table = #{blacklistTable},
</if>
</set>
where realtime_id =#{realtimeId}
</update>
<!--修改实时任务已选择的表信息-->
<update id="updateRealtimeSelectTable" parameterType="com.jz.dmp.modules.model.DmpRealtimeSyncSelectTable">
update dmp_realtime_sync_select_table
<set>
<if test="uptTime != null ">
upt_time = #{uptTime},
</if>
<if test="uptPerson != null and uptPerson != ''">
upt_person = #{uptPerson},
</if>
<if test="pkName != null and pkName != ''">
pk_name = #{pkName},
</if>
<if test="arithmetic != null and arithmetic != ''">
arithmetic = #{arithmetic},
</if>
<if test="desensitizationField != null and desensitizationField != ''">
desensitization_field = #{desensitizationField},
</if>
<if test="targetTableName != null and targetTableName != ''">
target_table_name = #{targetTableName},
</if>
<if test="srcTableName != null and srcTableName != ''">
src_table_name = #{srcTableName},
</if>
</set>
where realtime_id=#{realtimeId}
</update>
</mapper> </mapper>
\ No newline at end of file
{ {
"projectId":31, "projectId":31,
"treeId": 1, "treeId": 1,
"taskId":181,
"srcDataSourceId":205, "srcDataSourceId":205,
"targetDataSourceId":202, "targetDataSourceId":202,
"sourceName":"test34", "sourceName":"test34",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment