Commit 00dbac5e authored by mcb's avatar mcb

commit

parent a8f97b6b
......@@ -235,19 +235,6 @@ public class RealTimeSyncController {
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
//异步提交
/*Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
dmpRealtimeSyncInfoService.addRealTimeTask(params);
} catch (Exception e) {
e.printStackTrace();
}
}
});
thread.start();*/
return result;
}
......@@ -335,4 +322,34 @@ public class RealTimeSyncController {
return JsonResult.error("删除失败!");
}
}
/**
* 冲突检查
*
* @return
* @author Bellamy
* @since 2021-02-22
*/
@ApiOperation(value = "冲突检查", notes = "冲突检查")
@PostMapping(value = "/conflictCheck")
public JsonResult conflictCheck(@RequestBody ConflictCheckReq params, HttpServletRequest httpRequest) throws Exception {
logger.info("###################请求参数{}" + params.toString() + "############");
if (StringUtils.isEmpty(params.getTargetDataSourceId())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "目标数据源id不能空!");
}
if (StringUtils.isEmpty(params.getTablesName())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "表名称不能为空!");
}
JsonResult result = new JsonResult();
try {
result = dmpRealtimeSyncInfoService.conflictCheck(params);
} catch (Exception e) {
result.setMessage(e.getMessage());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return result;
}
}
\ No newline at end of file
package com.jz.dmp.modules.controller.DataIntegration.bean;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
/**
* @ClassName: ConflictCheckReq
* @Description: 冲突检查
* @Author : Bellamy
* @Date 2021/2/22
* @Version 1.0
*/
@Data
@ApiModel(value = "状态查看列表分页查询请求参数对象", description = "状态查看请求参数对象")
public class ConflictCheckReq implements Serializable {
private static final long serialVersionUID = 7850773528490999353L;
@NotNull(message = "目标数据源id不能为空")
@NotEmpty(message = "目标数据源id不能为空")
@ApiModelProperty(value = "目标数据源ID")
private String targetDataSourceId;
@NotNull(message = "表名称不能为空")
@NotEmpty(message = "表名称不能为空")
@ApiModelProperty(value = "表名称")
private String tablesName;
}
......@@ -163,6 +163,9 @@ public class DmpRealtimeSyncInfo implements Serializable {
@ApiModelProperty(value = "treeId")
private String treeId;
@ApiModelProperty(value = "scriptJson")
private String scriptJson;
public Integer getId() {
return id;
}
......@@ -410,4 +413,12 @@ public class DmpRealtimeSyncInfo implements Serializable {
public void setTreeId(String treeId) {
this.treeId = treeId;
}
public String getScriptJson() {
return scriptJson;
}
public void setScriptJson(String scriptJson) {
this.scriptJson = scriptJson;
}
}
\ No newline at end of file
......@@ -144,4 +144,13 @@ public interface DmpRealtimeSyncInfoService {
* @since 2021-01-05
*/
boolean batchUptOnlineStatus(String realTaskId, String onlineStatus) throws Exception;
/**
* 冲突检查
*
* @return
* @author Bellamy
* @since 2021-02-22
*/
JsonResult conflictCheck(ConflictCheckReq params) throws Exception;
}
\ No newline at end of file
package com.jz.dmp.modules.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.jz.agent.service.DmpDsAgentService;
import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.enums.DelFlagEnum;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService;
import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.realTime.DBUtil;
import com.jz.common.utils.realTime.RestClient;
import com.jz.common.utils.web.SessionUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.controller.DataIntegration.bean.*;
import com.jz.dmp.modules.dao.DmpNavigationTreeDao;
import com.jz.dmp.modules.dao.DmpProjectDao;
import com.jz.dmp.modules.dao.DmpRealtimeSyncHandleCountDao;
import com.jz.dmp.modules.dao.DmpRealtimeSyncInfoDao;
import com.jz.dmp.modules.dao.*;
import com.jz.dmp.modules.model.*;
import com.jz.dmp.modules.service.DmpRealtimeSyncInfoService;
import freemarker.core.ParseException;
......@@ -70,6 +71,12 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
@Autowired
private DmpNavigationTreeDao dmpNavigationTreeDao;
@Autowired
private OfflineSynchDao offlineSynchDao;
@Autowired
private DmpDsAgentService dmpDsAgentServiceImp;
/**
* 通过ID查询单条数据
*
......@@ -310,6 +317,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
DmpProjectSystemInfo dmpProjectSystemInfo = dmpProjectDao.queryProjectSystemInfo(projectId);
params.put("connectorSecurityFlag", dmpProjectSystemInfo.getKerberosIsenable()); //安全验证开关,是否启用KERBEROS
String kafkaConnectUrl = dmpProjectSystemInfo.getKafkaConnectorUrl(); //kafka 连接信息
String[] arr = kafkaConnectUrl.split(",");
//connect1@http://172.18.104.130:9993/connectors
if (StringUtils.isEmpty(connectorUrl))
return JsonResult.error(ResultCode.PARAMS_ERROR, "connectorUrl不能为空!");
......@@ -327,6 +336,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
private Long submitDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception {
DmpNavigationTree tree = dmpNavigationTreeDao.queryById(Integer.valueOf(params.get("treeId").toString()));
Integer srcDataSourceId = sourceDbInfo.getId();
Integer targetDataSourceId = targetDbInfo.getId();
......@@ -358,7 +368,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag")); //安全验证开关,是否启用KERBEROS
//前端定义的sourceConnectorName前缀
String sourceName = (String) params.get("sourceName");
//String sourceName = (String) params.get("sourceName");
String sourceName = tree.getName();
dataModelMap.put("sourceName", sourceName);
//source kafak topic
String topic = sourceDbInfo.getDatasourceName() + "_" + sourceName + "." + sourceDbInfo.getDbName() + ".databasehistory";
......@@ -401,6 +412,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody.setCreateTime(new Date());
saveBody.setCrePerson(SessionUtils.getCurrentUserId());
saveBody.setVersion("1.0");
saveBody.setScriptJson(JSONObject.toJSONString(params));
dmpRealtimeSyncInfoDao.insert(saveBody);
realtiemId = Long.valueOf(saveBody.getId());
logger.info("###################保存实时同步任务--结束 ################");
......@@ -788,7 +800,9 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
List<Map> list = dmpRealtimeSyncInfoDao.selectRealtimeTaskById(taskId);
if (list.size() > 0 && list != null) {
Map realtimeMap = list.get(0);
returnModel.setBlacklistTable(realtimeMap.get("blacklistTable").toString());
if (StringUtils.isNotEmpty((String) realtimeMap.get("blacklistTable"))) {
returnModel.setBlacklistTable((String) realtimeMap.get("blacklistTable"));
}
returnModel.setSrcDatasourceId(realtimeMap.get("srcDatasourceId").toString());
returnModel.setSrcDatasourceName(realtimeMap.get("srcDatasourceName").toString());
returnModel.setTargetDatasourceId(realtimeMap.get("targetDatasourceId").toString());
......@@ -798,11 +812,15 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
for (int i = 0; i < list.size(); i++) {
Map str = list.get(i);
DmpRealtimeSyncSelectTable selectTable = new DmpRealtimeSyncSelectTable();
selectTable.setDesensitizationField(str.get("desensitizationField").toString());
selectTable.setArithmetic(str.get("arithmetic").toString());
selectTable.setPkName(str.get("pkName").toString());
selectTable.setSrcTableName(str.get("srcTableName").toString());
selectTable.setTargetTableName(str.get("targetTableName").toString());
if (StringUtils.isNotEmpty((String) str.get("desensitizationField"))) {
selectTable.setDesensitizationField((String) str.get("desensitizationField")); //脱敏字段
}
if (StringUtils.isNotEmpty((String) str.get("arithmetic"))) {
selectTable.setArithmetic((String) str.get("arithmetic")); //算法
}
selectTable.setSrcTableName((String) str.get("srcTableName")); //来源表
selectTable.setTargetTableName((String) str.get("targetTableName")); //目标表
//selectTable.setPkName(str.get("pkName").toString());
selectList.add(selectTable);
returnModel.setSelectTable(selectList);
}
......@@ -853,4 +871,42 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
map.put("onlineStatus", onlineStatus);
return dmpRealtimeSyncInfoDao.deleteByrealTaskId(map) > 0;
}
/**
* 冲突检查(选择源表在目标表中存在就冲突)
*
* @param params
* @return
* @author Bellamy
* @since 2021-02-22
*/
@Override
public JsonResult conflictCheck(ConflictCheckReq params) throws Exception {
List<String> returnList = new ArrayList<>();
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo dsInfo = offlineSynchDao.querySourceDbInfoBySourceId(Long.valueOf(params.getTargetDataSourceId()));
if (dsInfo == null) {
throw new RuntimeException("数据源配置信息不存在!");
}
//解码源数据库密码
if (StringUtils.isNotBlank(dsInfo.getPassword())) {
dsInfo.setPassword(new BaseService().decode(dsInfo.getPassword(), publicKey));
}
//创建jdbc,获取源数据表
DmpAgentResult rst = dmpDsAgentServiceImp.getTableNameList(dsInfo);
if (!rst.getCode().val().equals("200")) {
return new JsonResult(rst.getCode(), rst.getMessage());
} else {
//成功
List<String> tableList = (List<String>) JSONObject.parse(rst.getMessage());
String[] tablesName = params.getTablesName().split(",");
for (String table : tablesName) {
if(tableList.contains(table)){
returnList.add(table);
}
}
}
return JsonResult.ok(returnList);
}
}
\ No newline at end of file
......@@ -794,10 +794,13 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
if (returnList != null && returnList.size() > 0) {
for (int i = 0; i < returnList.size(); i++) {
Map map = returnList.get(i);
//离线同步
map.put("id", ++len);
map.put("fieldAlias", map.get("name")); //字段别名
map.put("isPk", 0);
map.put("isPt", 0);
map.put("isPk", 0); //主键
map.put("isPt", 0); //分区
//实时同步:脱敏算法
map.put("arithmetic", "HmacSHA256");
returnData.add(map);
}
}
......
......@@ -24,7 +24,7 @@
select
ID, CATEGORY, TYPE, NAME, TREE_SORT, IS_LEVEL, IS_ENABLE, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, PROJECT_ID, PARENT_ID
from dmp_navigation_tree
where ID = #{id}
where 1=1 and data_status = '1' and ID = #{id}
</select>
<!--查询指定行数据-->
......
......@@ -158,10 +158,10 @@
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name
, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type
, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person, version)
, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person, version, script_json)
values (#{srcDatasourceId}, #{targetDatasourceId}, #{srcTableName}, #{targetTableName}, #{type}, #{connectorJobId}, #{connectorJsonData}, #{srcTopicName}, #{projectId}
, #{parentId}, #{desensitizationField}, #{arithmetic}, #{pkName}, #{sourceTypeName}, #{targetTypeName}, #{srcDatabaseType}, #{srcDatabaseName}, #{connectorUrl}, #{targetDatabaseType}
, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson} ,#{version})
, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson} ,#{version}, #{scriptJson})
</insert>
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
......
......@@ -85,7 +85,7 @@
project_id
FROM
dmp_syncing_datasource
WHERE id = #{sourceDbId}
WHERE data_status = '1' and id = #{sourceDbId}
</select>
<select id="querySourceDbInfoBySourceId" parameterType="map" resultType="com.jz.dmp.modules.model.DmpAgentDatasourceInfo">
......
......@@ -4,13 +4,13 @@
"taskId":181,
"srcDataSourceId":205,
"targetDataSourceId":202,
"sourceName":"test34",
//"sourceName":"test34", treeName
"sourceCustomArg":{
"decimal.handling.mode":"double"
},
"blacklistTables":"dmp_azkaban_exector_server_config,dmp_realtime_sync_handle_count,dmp_realtime_sync_info,dmp_realtime_sync_submit_result,table_operate_log",
"notSelectTables":"dmp_develop_function,dmp_develop_resource,dmp_develop_script,dmp_develop_task,dmp_develop_task_history,dmp_locks,dmp_module_operate_log,dmp_navigation_tree,dmp_open_api_es_fields,dmp_open_api_es_tagconfig,dmp_open_api_kafka_connector,dmp_ops_monitor,dmp_ops_monitor_setting,dmp_permission,dmp_project,dmp_project_member,dmp_project_member_role,dmp_project_openapi,dmp_project_orgid,dmp_project_permission,dmp_project_permission_bak,dmp_project_role,dmp_project_role_menu,dmp_project_role_permission,dmp_project_superuser_role,dmp_project_system_info,dmp_realtime_sync_blacklist_table_info,dmp_spark_applications,dmp_syncing_datasource,dmp_syncing_datasource_type,dmp_syncing_job_conf,dmp_syncing_job_reader,dmp_syncing_job_writer,dmp_system_lookup,dmp_system_user,dmp_table,dmp_table_access_auth,dmp_table_category,dmp_table_column,dmp_table_ddl_log,dmp_table_field_mapping,dmp_table_field_schema,dmp_table_manage,dmp_task_dependent,dmp_task_instance,dmp_task_instance_optlog,dmp_task_instance_runlog,dmp_task_schedule,dmp_user_member,dmp_work_flow_publish_details,dmp_work_flow_submit_details,dv_rule_check_result_t,dv_rule_t,dv_task_rule_t,oauth_token,oauth_token_key,salej2",
"connectorUrl":"connect1@http://172.18.104.130:9993/connectors",
//"blacklistTables":"dmp_azkaban_exector_server_config,dmp_realtime_sync_handle_count,dmp_realtime_sync_info,dmp_realtime_sync_submit_result,table_operate_log",
//"notSelectTables":"dmp_develop_function,dmp_develop_resource,dmp_develop_script,dmp_develop_task,dmp_develop_task_history,dmp_locks,dmp_module_operate_log,dmp_navigation_tree,dmp_open_api_es_fields,dmp_open_api_es_tagconfig,dmp_open_api_kafka_connector,dmp_ops_monitor,dmp_ops_monitor_setting,dmp_permission,dmp_project,dmp_project_member,dmp_project_member_role,dmp_project_openapi,dmp_project_orgid,dmp_project_permission,dmp_project_permission_bak,dmp_project_role,dmp_project_role_menu,dmp_project_role_permission,dmp_project_superuser_role,dmp_project_system_info,dmp_realtime_sync_blacklist_table_info,dmp_spark_applications,dmp_syncing_datasource,dmp_syncing_datasource_type,dmp_syncing_job_conf,dmp_syncing_job_reader,dmp_syncing_job_writer,dmp_system_lookup,dmp_system_user,dmp_table,dmp_table_access_auth,dmp_table_category,dmp_table_column,dmp_table_ddl_log,dmp_table_field_mapping,dmp_table_field_schema,dmp_table_manage,dmp_task_dependent,dmp_task_instance,dmp_task_instance_optlog,dmp_task_instance_runlog,dmp_task_schedule,dmp_user_member,dmp_work_flow_publish_details,dmp_work_flow_submit_details,dv_rule_check_result_t,dv_rule_t,dv_task_rule_t,oauth_token,oauth_token_key,salej2",
//"connectorUrl":"connect1@http://172.18.104.130:9993/connectors",
"tables":[
{
"sourceTableName":"dmp_data_contrast",
......@@ -23,3 +23,20 @@
]
}
/*{
"projectId":31,
"treeId": 1,
"taskId":181, //任务id编辑时 需要
"srcDataSourceId":205, //来源数据源id
"targetDataSourceId":202, //目标数据源id
"tables":[
{
"sourceTableName":"dmp_data_contrast", //选择的 来源表
"targetTableName":"dmp_data_contrast", //目标表
"desensitizationField":"target_database_name", //脱敏字段
"arithmetic":"HmacSHA256" //算法
}
]
}*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment