Commit 02e38b7d authored by sml's avatar sml

Merge branch 'dmp_dev' of http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-service.git into dmp_dev

parents 19d0e55c 54def7c9
......@@ -230,8 +230,17 @@ public class RealTimeSyncController {
return new JsonResult(ResultCode.PARAMS_ERROR, "业务节点id不能为空!");
}
JsonResult result = new JsonResult();
try {
result = dmpRealtimeSyncInfoService.addRealTimeTask(params);
} catch (Exception e) {
result.setMessage(e.getMessage());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
//异步提交
Thread thread = new Thread(new Runnable() {
/*Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
......@@ -241,8 +250,8 @@ public class RealTimeSyncController {
}
}
});
thread.start();
return new JsonResult();
thread.start();*/
return result;
}
/**
......@@ -312,9 +321,9 @@ public class RealTimeSyncController {
*/
@ApiOperation(value = "批量上下线", notes = "批量上下线")
@GetMapping(value = "/batchUptOnlineStatus")
@ApiImplicitParams({@ApiImplicitParam(name = "realTaskId", value = "任务id",required = true),
@ApiImplicitParam(name = "onlineStatus", value = "上下线状态:Y 上线,N 下线",required = true)})
public JsonResult batchUptOnlineStatus(@RequestParam String realTaskId,@RequestParam String onlineStatus) throws Exception {
@ApiImplicitParams({@ApiImplicitParam(name = "realTaskId", value = "任务id", required = true),
@ApiImplicitParam(name = "onlineStatus", value = "上下线状态:Y 上线,N 下线", required = true)})
public JsonResult batchUptOnlineStatus(@RequestParam String realTaskId, @RequestParam String onlineStatus) throws Exception {
logger.info("###################请求参数{}taskId=" + realTaskId + "###################");
if (StringUtils.isEmpty(realTaskId)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "任务id不能为空!");
......@@ -322,7 +331,7 @@ public class RealTimeSyncController {
if (StringUtils.isEmpty(onlineStatus)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "状态不能为空!");
}
boolean jsonResult = dmpRealtimeSyncInfoService.batchUptOnlineStatus(realTaskId,onlineStatus);
boolean jsonResult = dmpRealtimeSyncInfoService.batchUptOnlineStatus(realTaskId, onlineStatus);
if (jsonResult) {
return JsonResult.ok();
} else {
......
......@@ -6,6 +6,7 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import com.jz.dmp.modules.model.DmpRealtimeSyncSelectTable;
import com.jz.dmp.modules.model.DmpRealtimeTaskHistory;
import com.jz.dmp.modules.model.RealTimeSyncDataSourceModel;
import org.apache.ibatis.annotations.Param;
......@@ -185,4 +186,13 @@ public interface DmpRealtimeSyncInfoDao {
* @author Bellamy
*/
int deleteByrealTaskId(Map map) throws Exception;
/**
* 新增实时同步任务版本记录
*
* @return
* @author Bellamy
* @since 2021-02-02
*/
int insertRealtimeHistory(DmpRealtimeTaskHistory taskHistory) throws Exception;
}
\ No newline at end of file
......@@ -151,6 +151,11 @@ public class DmpRealtimeSyncInfo implements Serializable {
@ApiModelProperty(value = "更新人")
private String uptPerson;
/**
* 版本号
*/
@ApiModelProperty(value = "版本号")
private String version;
public Integer getId() {
return id;
......@@ -384,4 +389,11 @@ public class DmpRealtimeSyncInfo implements Serializable {
this.uptPerson = uptPerson;
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
}
\ No newline at end of file
......@@ -26,6 +26,7 @@ import freemarker.template.TemplateNotFoundException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
......@@ -306,19 +307,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
DmpProjectSystemInfo dmpProjectSystemInfo = dmpProjectDao.queryProjectSystemInfo(projectId);
params.put("connectorSecurityFlag", dmpProjectSystemInfo.getKerberosIsenable()); //安全验证开关,是否启用KERBEROS
/* //保存实时同步任务处理数
DmpRealtimeSyncHandleCount countModel = new DmpRealtimeSyncHandleCount();
countModel.setUuid(UUID.randomUUID().toString());
countModel.setToSubmit(1);
countModel.setCreTime(new Date());
//初始化当前任务记录数
List<Map<String, String>> tables = (List<Map<String, String>>) params.get("tables");
if (!CollectionUtils.isEmpty(tables)) {
countModel.setToSubmit(countModel.getToSubmit() + tables.size());
}
dmpRealtimeSyncHandleCountDao.insert(countModel);
logger.info("###################添加实时同步任务处理数结束{}" + countModel.toString() + "############");*/
//connect1@http://172.18.104.130:9993/connectors
if (StringUtils.isNotEmpty(connectorUrl)) {
if (connectorUrl.contains("@")) {
......@@ -389,6 +377,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
//source kafak topic
String topic = sourceDbInfo.getDatasourceName() + "_" + sourceName + "." + sourceDbInfo.getDbName() + ".databasehistory";
dataModelMap.put("topic", topic);
//connector_job_id 连接名称
String sourceConnectorName = "debezium-connector-" + sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName();
//查询同步任务信息表是否存在类型是数据源到数据源,srcDataSourceId,targetDataSourceId一样的信息 如果 存在就不发起请求
......@@ -423,13 +412,18 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody.setSourceTypeName(sourceDbInfo.getDataSourceTypeName());
saveBody.setTargetTypeName("Kafka");
saveBody.setConnectorJobId(connectorJobId);
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
saveBody.setCreateTime(new Date());
saveBody.setCrePerson(SessionUtils.getCurrentUserId());
saveBody.setVersion("1.0");
dmpRealtimeSyncInfoDao.insert(saveBody);
realtiemId = Long.valueOf(saveBody.getId());
logger.info("###################保存实时同步任务--结束 ################");
DmpRealtimeTaskHistory taskHistory = new DmpRealtimeTaskHistory();
BeanUtils.copyProperties(saveBody,taskHistory);
taskHistory.setRealtimeSyncId(saveBody.getId());
dmpRealtimeSyncInfoDao.insertRealtimeHistory(taskHistory);
Map blacklist = new HashMap();
blacklist.put("creTime", new Date());
blacklist.put("realtimeId", saveBody.getId());
......
......@@ -155,8 +155,12 @@
<!--新增所有列-->
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person)
values (#{srcDatasourceId}, #{targetDatasourceId}, #{srcTableName}, #{targetTableName}, #{type}, #{connectorJobId}, #{connectorJsonData}, #{srcTopicName}, #{projectId}, #{parentId}, #{desensitizationField}, #{arithmetic}, #{pkName}, #{sourceTypeName}, #{targetTypeName}, #{srcDatabaseType}, #{srcDatabaseName}, #{connectorUrl}, #{targetDatabaseType}, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson})
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name
, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type
, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person, version)
values (#{srcDatasourceId}, #{targetDatasourceId}, #{srcTableName}, #{targetTableName}, #{type}, #{connectorJobId}, #{connectorJsonData}, #{srcTopicName}, #{projectId}
, #{parentId}, #{desensitizationField}, #{arithmetic}, #{pkName}, #{sourceTypeName}, #{targetTypeName}, #{srcDatabaseType}, #{srcDatabaseName}, #{connectorUrl}, #{targetDatabaseType}
, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson} ,#{version})
</insert>
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
......@@ -445,6 +449,16 @@
values(#{realtimeId},#{desensitizationField},#{arithmetic},#{pkName},#{srcTableName},#{targetTableName},#{creTime},#{crePerson})
</insert>
<!--新增实时同步任务版本记录-->
<insert id="insertRealtimeHistory" parameterType="com.jz.dmp.modules.model.DmpRealtimeTaskHistory">
insert into dmp_realtime_task_history(realtime_sync_id,src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name
, project_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type
, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person, version)
values (#{realtimeSyncId},#{srcDatasourceId}, #{targetDatasourceId}, #{srcTableName}, #{targetTableName}, #{type}, #{connectorJobId}, #{connectorJsonData}, #{srcTopicName}, #{projectId}
, #{desensitizationField}, #{arithmetic}, #{pkName}, #{sourceTypeName}, #{targetTypeName}, #{srcDatabaseType}, #{srcDatabaseName}, #{connectorUrl}, #{targetDatabaseType}
, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson} ,#{version})
</insert>
<!--修改实时任务黑名单表-->
<update id="updateRealtimeBlackList" parameterType="java.util.Map">
update dmp_realtime_sync_blacklist_table_info
......
......@@ -224,4 +224,149 @@
}
]
}
}
\ No newline at end of file
}
/* 测试数据
{
"params": {
"version": "1.0",
"treeId": 617,
"mode": "0",
"projectId": "31",
"taskId":"",
"taskName": "dmp_demo_dmp_azkaban_exector_server_config",
"scripts": {
"setting": {
"extract": "incremental",
"extractExpression": "where 1=1",
"targetInsertMergeOverwrite": "insert",
"ftColumn": "1",
"ftCount": "1",
"separateMax": "1",
"separateMin": "",
"postImportStatement": "12",
"preImportStatement": "12",
"errorLimitRecord": "21",
"maxConcurrency": "2",
"executorMemory":"1",
"executorCores":"1",
"totalExecutorCores":"1",
"fieldMapping":""
},
"reader": {
"dbConnection": "mysql_dmp_demo_test",
"fileType": "",
"sourceHdfsPath": "",
"sourceHdfsFile": "",
"sourceFtpDir": "",
"sourceFtpFile": "",
"sourceSkipFtpFile": "",
"sourceCsvDelimiter": "",
"sourceCsvHeader": "",
"sourceCsvCharset": "",
"sourceCsvQuote": "",
"sourceFtpLoadDate": "",
"registerTableName": "dmp_azkaban_exector_server_config",
"dayByDay": "false",
"column": [
{
"name": "host",
"type": "VARCHAR"
},
{
"name": "port",
"type": "VARCHAR"
},
{
"name": "user_name",
"type": "VARCHAR"
},
{
"name": "pass_word",
"type": "VARCHAR"
}
]
},
"writer": {
"targetDbConnection": "mysql_dmp_demo_test",
"targetTable": "dmp_azkaban_exector_server_config",
"targetFtpDir": "",
"targetFtpFile": "",
"targetCsvDelimiter": "",
"targetCsvHeader": "",
"targetCsvCharset": "",
"targetInsertMergeOverwrite": "insert",
"column": [
{
"name": "host",
"type": "VARCHAR",
"isPk": "1",
"isPt": "0",
"rules": [
{
"method": "",
"type": ""
}
]
},
{
"name": "port",
"type": "VARCHAR",
"isPk": "0",
"isPt": "1",
"rules": [
{
"method": "",
"type": ""
}
]
},
{
"name": "user_name",
"type": "VARCHAR",
"isPk": "0",
"isPt": "0",
"rules": [
{
"method": "",
"type": ""
}
]
},
{
"name": "pass_word",
"type": "VARCHAR",
"isPk": "0",
"isPt": "0",
"rules": [
{
"method": "",
"type": ""
}
]
}
]
}
},
"taskRules": [
{
"ruleId": "",
"ruleValue": {
"dv_fields": [
{
"fieldName": ""
}
],
"dvTime": {
"timeField": "",
"timeValue": {
"startTime": "",
"endTime": ""
}
}
}
}
]
}
}*/
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment