Commit ff35f8cd authored by mcb's avatar mcb

实时同步任务列表

parent 260778ef
...@@ -5,4 +5,5 @@ alter table dmp_develop_task add TARGET_DB_NAME varchar(64) default NULL COMMENT ...@@ -5,4 +5,5 @@ alter table dmp_develop_task add TARGET_DB_NAME varchar(64) default NULL COMMENT
alter table dmp_develop_task add TARGET_TABLE_NAME varchar(64) default NULL COMMENT'目标数据表名称'; alter table dmp_develop_task add TARGET_TABLE_NAME varchar(64) default NULL COMMENT'目标数据表名称';
alter table dmp_syncing_datasource_type add DB_ATTRS json default null comment '数据库属性'; alter table dmp_syncing_datasource_type add DB_ATTRS json default null comment '数据库属性';
alter table dmp_syncing_datasource add NETWORK_CONNECTION_TYPE varchar(64) default null comment '网络连接类型'; alter table dmp_syncing_datasource add NETWORK_CONNECTION_TYPE varchar(64) default null comment '网络连接类型';
alter table dmp_syncing_datasource add TEST_CONNECT_STATUS char(2) default '01' comment '测试连通状态:01未测试,02连通性正常,03连通性异常'; alter table dmp_syncing_datasource add TEST_CONNECT_STATUS char(2) default '01' comment '测试连通状态:01未测试,02连通性正常,03连通性异常';
\ No newline at end of file alter table dmp_realtime_sync_info add tree_id varchar(64) DEFAULT NULL COMMENT 'treeID';
package com.jz.common.utils.realTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class CmdUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(CmdUtils.class);
/**
* 执行shell命令
* @param shellPath shell路径
* @param args shell参数
* @return
*/
public static boolean callShell(String shellPath,String ...args) {
boolean result = true;
try {
StringBuffer shellSb = new StringBuffer();
shellSb.append(shellPath);
if (args != null && args.length >= 1) {
for (String arg : args) {
shellSb.append(" ");
shellSb.append(arg);
shellSb.append(" ");
}
}
String shell = shellSb.toString();
LOGGER.info("执行shell:"+shell);
Process process = Runtime.getRuntime().exec(shell);
int status = 0 ;
try {
status = process.waitFor();
LOGGER.info("执行shell:"+shell+" 返回status:"+status);
if(status != 0){
result = false;
}
} catch (InterruptedException e) {
result = false;
e.printStackTrace();
}
} catch (IOException e) {
result = false ;
e.printStackTrace();
}
return result;
}
}
package com.jz.dmp.modules.controller.DataIntegration;
import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.utils.realTime.CmdUtils;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import com.jz.dmp.modules.service.DmpRealtimeSyncInfoService;
import com.jz.dmp.modules.service.impl.OfflineSynchServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
/**
* 实时同步任务
*
* @author Bellamy
* @since 2021-01-05 10:56:18
*/
@RestController
@RequestMapping("/realTimeSync")
@Api(tags = "数据集成--实时同步任务")
public class RealTimeSyncController {
private static Logger logger = LoggerFactory.getLogger(RealTimeSyncController.class);
/**
* 服务对象
*/
@Autowired
private DmpRealtimeSyncInfoService dmpRealtimeSyncInfoService;
/**
* 实时同步任务列表分页查询
*
* @return
* @author Bellamy
* @since 2021-01-05
*/
@ApiOperation(value = "实时同步任务列表分页查询", notes = "实时同步任务列表分页查询")
@PostMapping(value = "/realTimeSyncListPage")
public PageInfoResponse<RealTimeSyncListDto> getDataSourceListPage(@RequestBody RealTimeSyncListReq req, HttpServletRequest httpRequest) throws Exception {
PageInfoResponse<RealTimeSyncListDto> pageInfo = new PageInfoResponse<RealTimeSyncListDto>();
if (StringUtils.isEmpty(req.getProjectId())) {
pageInfo.setMessage("项目id不能为空!");
pageInfo.setCode(ResultCode.PARAMS_ERROR);
return pageInfo;
}
try {
pageInfo = dmpRealtimeSyncInfoService.queryRealTimeSyncListPage(req);
} catch (Exception e) {
pageInfo.setMessage("查询失败");
pageInfo.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return pageInfo;
}
/**
* 启动实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-05
*/
@ApiOperation(value = "启动实时同步任务", notes = "启动实时同步任务")
@GetMapping(value = "/startRealTimeSync")
@ApiImplicitParam(name = "realTaskId", value = "任务id")
public JsonResult startRealTimeSync(@RequestParam String realTaskId) throws Exception {
if (StringUtils.isEmpty(realTaskId)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "任务id不能为空!");
}
DmpRealtimeSyncInfo dmpRealtimeSyncInfo = dmpRealtimeSyncInfoService.queryById(Integer.valueOf(realTaskId));
String srcTopicName = dmpRealtimeSyncInfo.getSrcTopicName();
System.out.println(srcTopicName);
logger.info("############正常执行表数据........"+realTaskId);
String shellPath ="/app/bigdata-app/scripts/trigger_straming.sh";
CmdUtils.callShell(shellPath, srcTopicName);
return new JsonResult();
}
}
\ No newline at end of file
package com.jz.dmp.modules.controller.DataIntegration.bean;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* @ClassName: RealTimeSyncListDto
* @Description: 实时同步列表返回参数
* @Author:Bellamy
* @Date 2021/01/05
* @Version 1.0
*/
@ApiModel(value = "实时同步列表返回参数", description = "实时同步列表返回参数")
public class RealTimeSyncListDto {
/*
* 实时同步任务ID
* */
@ApiModelProperty(value = "实时同步任务ID")
private String id;
/*
* 业务树节点id
* */
@ApiModelProperty(value = "业务树节点id")
private String treeId;
/*
* 业务树节点名称
* */
@ApiModelProperty(value = "业务树节点名称")
private String treeName;
/*
* 实时同步任务状态
* */
@ApiModelProperty(value = "实时同步任务状态")
private String status;
/*
* 更新时间
* */
@ApiModelProperty(value = "更新时间")
private String updateTime;
/*
* 来源数据源id
* */
@ApiModelProperty(value = "来源数据源id")
private String srcDatasourceId;
/*
* 来源数据源名称
* */
@ApiModelProperty(value = "来源数据源名称")
private String srcDatasourceName;
/*
* 目标数据源名称
* */
@ApiModelProperty(value = "来源数据源类型")
private String srcDatabaseType;
/*
* 目标数据源id
* */
@ApiModelProperty(value = "目标数据源id")
private String targetDatasourceId;
/*
* 目标数据源名称
* */
@ApiModelProperty(value = "目标数据源名称")
private String targetDatasourceName;
/*
* 目标数据源类型
* */
@ApiModelProperty(value = "目标数据源类型")
private String targetDatabaseType;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getTreeId() {
return treeId;
}
public void setTreeId(String treeId) {
this.treeId = treeId;
}
public String getTreeName() {
return treeName;
}
public void setTreeName(String treeName) {
this.treeName = treeName;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getUpdateTime() {
return updateTime;
}
public void setUpdateTime(String updateTime) {
this.updateTime = updateTime;
}
public String getSrcDatasourceId() {
return srcDatasourceId;
}
public void setSrcDatasourceId(String srcDatasourceId) {
this.srcDatasourceId = srcDatasourceId;
}
public String getSrcDatasourceName() {
return srcDatasourceName;
}
public void setSrcDatasourceName(String srcDatasourceName) {
this.srcDatasourceName = srcDatasourceName;
}
public String getSrcDatabaseType() {
return srcDatabaseType;
}
public void setSrcDatabaseType(String srcDatabaseType) {
this.srcDatabaseType = srcDatabaseType;
}
public String getTargetDatasourceId() {
return targetDatasourceId;
}
public void setTargetDatasourceId(String targetDatasourceId) {
this.targetDatasourceId = targetDatasourceId;
}
public String getTargetDatasourceName() {
return targetDatasourceName;
}
public void setTargetDatasourceName(String targetDatasourceName) {
this.targetDatasourceName = targetDatasourceName;
}
public String getTargetDatabaseType() {
return targetDatabaseType;
}
public void setTargetDatabaseType(String targetDatabaseType) {
this.targetDatabaseType = targetDatabaseType;
}
}
package com.jz.dmp.modules.controller.DataIntegration.bean;
import com.jz.common.page.BasePageBean;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
/**
* @ClassName: RealTimeSyncListReq
* @Description: 实时同步列表请求参数
* @Author:Bellamy
* @Date 2021/01/05
* @Version 1.0
*/
@ApiModel(value = "实时同步列表请求参数", description = "实时同步列表请求参数")
public class RealTimeSyncListReq extends BasePageBean {
/*
* 源数据类型id
* */
@ApiModelProperty(value = "来源数据类型id")
private String sourceDatabaseTypeId;
/*
* 目标数据源名称
* */
@ApiModelProperty(value = "来源数据源名称")
private String sourceDatabaseName;
/*
* 目标数据类型id
* */
@ApiModelProperty(value = "目标数据类型id")
private String targetDatabaseTypeId;
/*
* 目标数据源名称
* */
@ApiModelProperty(value = "目标数据源名称")
private String targetDatabaseName;
/*
* 项目id
* */
@ApiModelProperty(value = "项目id")
@NotNull(message = "项目id不能为空")
@NotEmpty(message = "项目ID不能空")
private String projectId;
/*
* 目标数据源名称
* */
@ApiModelProperty(value = "目标数据源名称")
private String taskStatus;
/*
* 节点id
* */
@ApiModelProperty(value = "节点id")
private String treeId;
public String getProjectId() {
return projectId;
}
public void setProjectId(String projectId) {
this.projectId = projectId;
}
public String getSourceDatabaseTypeId() {
return sourceDatabaseTypeId;
}
public void setSourceDatabaseTypeId(String sourceDatabaseTypeId) {
this.sourceDatabaseTypeId = sourceDatabaseTypeId;
}
public String getSourceDatabaseName() {
return sourceDatabaseName;
}
public void setSourceDatabaseName(String sourceDatabaseName) {
this.sourceDatabaseName = sourceDatabaseName;
}
public String getTargetDatabaseTypeId() {
return targetDatabaseTypeId;
}
public void setTargetDatabaseTypeId(String targetDatabaseTypeId) {
this.targetDatabaseTypeId = targetDatabaseTypeId;
}
public String getTargetDatabaseName() {
return targetDatabaseName;
}
public void setTargetDatabaseName(String targetDatabaseName) {
this.targetDatabaseName = targetDatabaseName;
}
public String getTaskStatus() {
return taskStatus;
}
public void setTaskStatus(String taskStatus) {
this.taskStatus = taskStatus;
}
public String getTreeId() {
return treeId;
}
public void setTreeId(String treeId) {
this.treeId = treeId;
}
}
package com.jz.dmp.modules.dao;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 实时同步任务(DmpRealtimeSyncInfo)表数据库访问层
*
* @author Bellamy
* @since 2021-01-05 14:18:03
*/
public interface DmpRealtimeSyncInfoDao {
/**
* 通过ID查询单条数据
*
* @param id 主键
* @return 实例对象
*/
DmpRealtimeSyncInfo queryById(Integer id);
/**
* 查询指定行数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
List<DmpRealtimeSyncInfo> queryAllByLimit(@Param("offset") int offset, @Param("limit") int limit);
/**
* 通过实体作为筛选条件查询
*
* @param dmpRealtimeSyncInfo 实例对象
* @return 对象列表
*/
List<DmpRealtimeSyncInfo> queryAll(DmpRealtimeSyncInfo dmpRealtimeSyncInfo);
/**
* 新增数据
*
* @param dmpRealtimeSyncInfo 实例对象
* @return 影响行数
*/
int insert(DmpRealtimeSyncInfo dmpRealtimeSyncInfo);
/**
* 批量新增数据(MyBatis原生foreach方法)
*
* @param entities List<DmpRealtimeSyncInfo> 实例对象列表
* @return 影响行数
*/
int insertBatch(@Param("entities") List<DmpRealtimeSyncInfo> entities);
/**
* 批量新增或按主键更新数据(MyBatis原生foreach方法)
*
* @param entities List<DmpRealtimeSyncInfo> 实例对象列表
* @return 影响行数
*/
int insertOrUpdateBatch(@Param("entities") List<DmpRealtimeSyncInfo> entities);
/**
* 修改数据
*
* @param dmpRealtimeSyncInfo 实例对象
* @return 影响行数
*/
int update(DmpRealtimeSyncInfo dmpRealtimeSyncInfo);
/**
* 通过主键删除数据
*
* @param id 主键
* @return 影响行数
*/
int deleteById(Integer id);
/**
* 实时同步任务列表分页查询
* @return
*/
List<RealTimeSyncListDto> queryRealTimeSyncListPage(RealTimeSyncListReq req) throws Exception;
}
\ No newline at end of file
package com.jz.dmp.modules.model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Date;
/**
* 实时同步任务(DmpRealtimeSyncInfo)实体类
*
* @author makejava
* @since 2021-01-05 14:18:00
*/
@ApiModel(value = "实时同步任务", description = "实时同步任务")
public class DmpRealtimeSyncInfo implements Serializable {
private static final long serialVersionUID = 622628445093766366L;
/**
* 实时同步任务ID
*/
@ApiModelProperty(value = "实时同步任务ID")
private Integer id;
/**
* 源数据库id
*/
@ApiModelProperty(value = "源数据库id")
private Integer srcDatasourceId;
/**
* 目标数据库id
*/
@ApiModelProperty(value = "目标数据库id")
private Integer targetDatasourceId;
/**
* 源数据库表名称
*/
@ApiModelProperty(value = "源数据库表名称")
private String srcTableName;
/**
* 目标数据库表名称
*/
@ApiModelProperty(value = "目标数据库表名称")
private String targetTableName;
@ApiModelProperty(value = "${column.comment}")
private Integer type;
@ApiModelProperty(value = "${column.comment}")
private String connectorJobId;
@ApiModelProperty(value = "${column.comment}")
private Object connectorJsonData;
@ApiModelProperty(value = "${column.comment}")
private String srcTopicName;
/**
* 项目id
*/
@ApiModelProperty(value = "项目id")
private Object projectId;
@ApiModelProperty(value = "${column.comment}")
private Integer parentId;
/**
* 脱敏字段
*/
@ApiModelProperty(value = "脱敏字段")
private String desensitizationField;
/**
* 加密算法
*/
@ApiModelProperty(value = "加密算法")
private String arithmetic;
/**
* 主键名称
*/
@ApiModelProperty(value = "主键名称")
private String pkName;
/**
* 源类型名称
*/
@ApiModelProperty(value = "源类型名称")
private String sourceTypeName;
/**
* 目标类型名称
*/
@ApiModelProperty(value = "目标类型名称")
private String targetTypeName;
/**
* 源数据库类型
*/
@ApiModelProperty(value = "源数据库类型")
private String srcDatabaseType;
/**
* 源数据库名称
*/
@ApiModelProperty(value = "源数据库名称")
private String srcDatabaseName;
/**
* 目标数据源URL连接信息
*/
@ApiModelProperty(value = "目标数据源URL连接信息")
private String connectorUrl;
/**
* 目标数据库类型
*/
@ApiModelProperty(value = "目标数据库类型")
private String targetDatabaseType;
/**
* 目标数据库名称
*/
@ApiModelProperty(value = "目标数据库名称")
private String targetDatabaseName;
/**
* 源数据库
*/
@ApiModelProperty(value = "源数据库")
private String srcDatasourceName;
/**
* 目标数据源库
*/
@ApiModelProperty(value = "目标数据源库")
private String targetDatasourceName;
/**
* 存储类型
*/
@ApiModelProperty(value = "存储类型")
private String storeType;
/**
* 任务状态
*/
@ApiModelProperty(value = "任务状态")
private String status;
/**
* 创建时间
*/
@ApiModelProperty(value = "创建时间")
private Date createTime;
/**
* 更新时间
*/
@ApiModelProperty(value = "更新时间")
private Date updateTime;
/**
* 创建人
*/
@ApiModelProperty(value = "创建人")
private String crePerson;
/**
* 更新人
*/
@ApiModelProperty(value = "更新人")
private String uptPerson;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Integer getSrcDatasourceId() {
return srcDatasourceId;
}
public void setSrcDatasourceId(Integer srcDatasourceId) {
this.srcDatasourceId = srcDatasourceId;
}
public Integer getTargetDatasourceId() {
return targetDatasourceId;
}
public void setTargetDatasourceId(Integer targetDatasourceId) {
this.targetDatasourceId = targetDatasourceId;
}
public String getSrcTableName() {
return srcTableName;
}
public void setSrcTableName(String srcTableName) {
this.srcTableName = srcTableName;
}
public String getTargetTableName() {
return targetTableName;
}
public void setTargetTableName(String targetTableName) {
this.targetTableName = targetTableName;
}
public Integer getType() {
return type;
}
public void setType(Integer type) {
this.type = type;
}
public String getConnectorJobId() {
return connectorJobId;
}
public void setConnectorJobId(String connectorJobId) {
this.connectorJobId = connectorJobId;
}
public Object getConnectorJsonData() {
return connectorJsonData;
}
public void setConnectorJsonData(Object connectorJsonData) {
this.connectorJsonData = connectorJsonData;
}
public String getSrcTopicName() {
return srcTopicName;
}
public void setSrcTopicName(String srcTopicName) {
this.srcTopicName = srcTopicName;
}
public Object getProjectId() {
return projectId;
}
public void setProjectId(Object projectId) {
this.projectId = projectId;
}
public Integer getParentId() {
return parentId;
}
public void setParentId(Integer parentId) {
this.parentId = parentId;
}
public String getDesensitizationField() {
return desensitizationField;
}
public void setDesensitizationField(String desensitizationField) {
this.desensitizationField = desensitizationField;
}
public String getArithmetic() {
return arithmetic;
}
public void setArithmetic(String arithmetic) {
this.arithmetic = arithmetic;
}
public String getPkName() {
return pkName;
}
public void setPkName(String pkName) {
this.pkName = pkName;
}
public String getSourceTypeName() {
return sourceTypeName;
}
public void setSourceTypeName(String sourceTypeName) {
this.sourceTypeName = sourceTypeName;
}
public String getTargetTypeName() {
return targetTypeName;
}
public void setTargetTypeName(String targetTypeName) {
this.targetTypeName = targetTypeName;
}
public String getSrcDatabaseType() {
return srcDatabaseType;
}
public void setSrcDatabaseType(String srcDatabaseType) {
this.srcDatabaseType = srcDatabaseType;
}
public String getSrcDatabaseName() {
return srcDatabaseName;
}
public void setSrcDatabaseName(String srcDatabaseName) {
this.srcDatabaseName = srcDatabaseName;
}
public String getConnectorUrl() {
return connectorUrl;
}
public void setConnectorUrl(String connectorUrl) {
this.connectorUrl = connectorUrl;
}
public String getTargetDatabaseType() {
return targetDatabaseType;
}
public void setTargetDatabaseType(String targetDatabaseType) {
this.targetDatabaseType = targetDatabaseType;
}
public String getTargetDatabaseName() {
return targetDatabaseName;
}
public void setTargetDatabaseName(String targetDatabaseName) {
this.targetDatabaseName = targetDatabaseName;
}
public String getSrcDatasourceName() {
return srcDatasourceName;
}
public void setSrcDatasourceName(String srcDatasourceName) {
this.srcDatasourceName = srcDatasourceName;
}
public String getTargetDatasourceName() {
return targetDatasourceName;
}
public void setTargetDatasourceName(String targetDatasourceName) {
this.targetDatasourceName = targetDatasourceName;
}
public String getStoreType() {
return storeType;
}
public void setStoreType(String storeType) {
this.storeType = storeType;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public String getCrePerson() {
return crePerson;
}
public void setCrePerson(String crePerson) {
this.crePerson = crePerson;
}
public String getUptPerson() {
return uptPerson;
}
public void setUptPerson(String uptPerson) {
this.uptPerson = uptPerson;
}
}
\ No newline at end of file
package com.jz.dmp.modules.service;
import com.jz.common.page.PageInfoResponse;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import java.util.List;
/**
* 实时同步任务(DmpRealtimeSyncInfo)表服务接口
*
* @author Bellamy
* @since 2021-01-05 14:18:03
*/
public interface DmpRealtimeSyncInfoService {
/**
* 通过ID查询单条数据
*
* @param id 主键
* @return 实例对象
*/
DmpRealtimeSyncInfo queryById(Integer id);
/**
* 查询多条数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
List<DmpRealtimeSyncInfo> queryAllByLimit(int offset, int limit);
/**
* 新增数据
*
* @param dmpRealtimeSyncInfo 实例对象
* @return 实例对象
*/
DmpRealtimeSyncInfo insert(DmpRealtimeSyncInfo dmpRealtimeSyncInfo);
/**
* 修改数据
*
* @param dmpRealtimeSyncInfo 实例对象
* @return 实例对象
*/
DmpRealtimeSyncInfo update(DmpRealtimeSyncInfo dmpRealtimeSyncInfo);
/**
* 通过主键删除数据
*
* @param id 主键
* @return 是否成功
*/
boolean deleteById(Integer id);
/**
* 实时同步任务列表分页查询
*
* @return
*/
PageInfoResponse<RealTimeSyncListDto> queryRealTimeSyncListPage(RealTimeSyncListReq req) throws Exception;
}
\ No newline at end of file
package com.jz.dmp.modules.service.impl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.TaskListPageDto;
import com.jz.dmp.modules.dao.DmpRealtimeSyncInfoDao;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import com.jz.dmp.modules.service.DmpRealtimeSyncInfoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* 实时同步任务(DmpRealtimeSyncInfo)表服务实现类
*
* @author Bellamy
* @since 2021-01-05 14:18:03
*/
@Service("dmpRealtimeSyncInfoService")
public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoService {
@Autowired
private DmpRealtimeSyncInfoDao dmpRealtimeSyncInfoDao;
/**
* 通过ID查询单条数据
*
* @param id 主键
* @return 实例对象
*/
@Override
public DmpRealtimeSyncInfo queryById(Integer id) {
return this.dmpRealtimeSyncInfoDao.queryById(id);
}
/**
* 查询多条数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
@Override
public List<DmpRealtimeSyncInfo> queryAllByLimit(int offset, int limit) {
return this.dmpRealtimeSyncInfoDao.queryAllByLimit(offset, limit);
}
/**
* 新增数据
*
* @param dmpRealtimeSyncInfo 实例对象
* @return 实例对象
*/
@Override
public DmpRealtimeSyncInfo insert(DmpRealtimeSyncInfo dmpRealtimeSyncInfo) {
this.dmpRealtimeSyncInfoDao.insert(dmpRealtimeSyncInfo);
return dmpRealtimeSyncInfo;
}
/**
* 修改数据
*
* @param dmpRealtimeSyncInfo 实例对象
* @return 实例对象
*/
@Override
public DmpRealtimeSyncInfo update(DmpRealtimeSyncInfo dmpRealtimeSyncInfo) {
this.dmpRealtimeSyncInfoDao.update(dmpRealtimeSyncInfo);
return this.queryById(dmpRealtimeSyncInfo.getId());
}
/**
* 通过主键删除数据
*
* @param id 主键
* @return 是否成功
*/
@Override
public boolean deleteById(Integer id) {
return this.dmpRealtimeSyncInfoDao.deleteById(id) > 0;
}
/**
* 实时同步任务列表分页查询
* @return
*/
@Override
public PageInfoResponse<RealTimeSyncListDto> queryRealTimeSyncListPage(RealTimeSyncListReq req) throws Exception {
PageInfoResponse<RealTimeSyncListDto> pageInfoResponse = new PageInfoResponse<>();
PageHelper.startPage(req.getPageNum(), req.getPageSize());
List<RealTimeSyncListDto> list = dmpRealtimeSyncInfoDao.queryRealTimeSyncListPage(req);
PageInfo<RealTimeSyncListDto> pageInfo = new PageInfo<>(list);
pageInfoResponse.setCode(ResultCode.SUCCESS);
pageInfoResponse.setMessage("查询成功");
pageInfoResponse.setData(pageInfo);
return pageInfoResponse;
}
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jz.dmp.modules.dao.DmpRealtimeSyncInfoDao">
<resultMap type="com.jz.dmp.modules.model.DmpRealtimeSyncInfo" id="DmpRealtimeSyncInfoMap">
<result property="id" column="id" jdbcType="INTEGER"/>
<result property="srcDatasourceId" column="src_datasource_id" jdbcType="INTEGER"/>
<result property="targetDatasourceId" column="target_datasource_id" jdbcType="INTEGER"/>
<result property="srcTableName" column="src_table_name" jdbcType="VARCHAR"/>
<result property="targetTableName" column="target_table_name" jdbcType="VARCHAR"/>
<result property="type" column="type" jdbcType="INTEGER"/>
<result property="connectorJobId" column="connector_job_id" jdbcType="VARCHAR"/>
<result property="connectorJsonData" column="connector_json_data" jdbcType="OTHER"/>
<result property="srcTopicName" column="src_topic_name" jdbcType="VARCHAR"/>
<result property="projectId" column="project_id" jdbcType="OTHER"/>
<result property="parentId" column="parent_id" jdbcType="INTEGER"/>
<result property="desensitizationField" column="desensitization_field" jdbcType="VARCHAR"/>
<result property="arithmetic" column="arithmetic" jdbcType="VARCHAR"/>
<result property="pkName" column="pk_name" jdbcType="VARCHAR"/>
<result property="sourceTypeName" column="source_type_name" jdbcType="VARCHAR"/>
<result property="targetTypeName" column="target_type_name" jdbcType="VARCHAR"/>
<result property="srcDatabaseType" column="src_database_type" jdbcType="VARCHAR"/>
<result property="srcDatabaseName" column="src_database_name" jdbcType="VARCHAR"/>
<result property="connectorUrl" column="connector_url" jdbcType="VARCHAR"/>
<result property="targetDatabaseType" column="target_database_type" jdbcType="VARCHAR"/>
<result property="targetDatabaseName" column="target_database_name" jdbcType="VARCHAR"/>
<result property="srcDatasourceName" column="src_datasource_name" jdbcType="VARCHAR"/>
<result property="targetDatasourceName" column="target_datasource_name" jdbcType="VARCHAR"/>
<result property="storeType" column="store_type" jdbcType="VARCHAR"/>
<result property="status" column="status" jdbcType="VARCHAR"/>
<result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
<result property="crePerson" column="cre_person" jdbcType="VARCHAR"/>
<result property="uptPerson" column="upt_person" jdbcType="VARCHAR"/>
</resultMap>
<!--查询单个-->
<select id="queryById" resultMap="DmpRealtimeSyncInfoMap">
select
id, src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data
, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name
, src_database_type, src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name
, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person
from dmp_realtime_sync_info
where id = #{id}
</select>
<!--查询指定行数据-->
<select id="queryAllByLimit" resultMap="DmpRealtimeSyncInfoMap">
select
id, src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person
from dmp_realtime_sync_info
limit #{offset}, #{limit}
</select>
<!--通过实体作为筛选条件查询-->
<select id="queryAll" resultMap="DmpRealtimeSyncInfoMap">
select
id, src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id,
connector_json_data, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name,
source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type,
target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time,
cre_person, upt_person
from dmp_realtime_sync_info
<where>
<if test="id != null">
and id = #{id}
</if>
<if test="srcDatasourceId != null">
and src_datasource_id = #{srcDatasourceId}
</if>
<if test="targetDatasourceId != null">
and target_datasource_id = #{targetDatasourceId}
</if>
<if test="srcTableName != null and srcTableName != ''">
and src_table_name = #{srcTableName}
</if>
<if test="targetTableName != null and targetTableName != ''">
and target_table_name = #{targetTableName}
</if>
<if test="type != null">
and type = #{type}
</if>
<if test="connectorJobId != null and connectorJobId != ''">
and connector_job_id = #{connectorJobId}
</if>
<if test="connectorJsonData != null">
and connector_json_data = #{connectorJsonData}
</if>
<if test="srcTopicName != null and srcTopicName != ''">
and src_topic_name = #{srcTopicName}
</if>
<if test="projectId != null">
and project_id = #{projectId}
</if>
<if test="parentId != null">
and parent_id = #{parentId}
</if>
<if test="desensitizationField != null and desensitizationField != ''">
and desensitization_field = #{desensitizationField}
</if>
<if test="arithmetic != null and arithmetic != ''">
and arithmetic = #{arithmetic}
</if>
<if test="pkName != null and pkName != ''">
and pk_name = #{pkName}
</if>
<if test="sourceTypeName != null and sourceTypeName != ''">
and source_type_name = #{sourceTypeName}
</if>
<if test="targetTypeName != null and targetTypeName != ''">
and target_type_name = #{targetTypeName}
</if>
<if test="srcDatabaseType != null and srcDatabaseType != ''">
and src_database_type = #{srcDatabaseType}
</if>
<if test="srcDatabaseName != null and srcDatabaseName != ''">
and src_database_name = #{srcDatabaseName}
</if>
<if test="connectorUrl != null and connectorUrl != ''">
and connector_url = #{connectorUrl}
</if>
<if test="targetDatabaseType != null and targetDatabaseType != ''">
and target_database_type = #{targetDatabaseType}
</if>
<if test="targetDatabaseName != null and targetDatabaseName != ''">
and target_database_name = #{targetDatabaseName}
</if>
<if test="srcDatasourceName != null and srcDatasourceName != ''">
and src_datasource_name = #{srcDatasourceName}
</if>
<if test="targetDatasourceName != null and targetDatasourceName != ''">
and target_datasource_name = #{targetDatasourceName}
</if>
<if test="storeType != null and storeType != ''">
and store_type = #{storeType}
</if>
<if test="status != null and status != ''">
and status = #{status}
</if>
<if test="createTime != null">
and create_time = #{createTime}
</if>
<if test="updateTime != null">
and update_time = #{updateTime}
</if>
<if test="crePerson != null and crePerson != ''">
and cre_person = #{crePerson}
</if>
<if test="uptPerson != null and uptPerson != ''">
and upt_person = #{uptPerson}
</if>
</where>
</select>
<!--新增所有列-->
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person)
values (#{srcDatasourceId}, #{targetDatasourceId}, #{srcTableName}, #{targetTableName}, #{type}, #{connectorJobId}, #{connectorJsonData}, #{srcTopicName}, #{projectId}, #{parentId}, #{desensitizationField}, #{arithmetic}, #{pkName}, #{sourceTypeName}, #{targetTypeName}, #{srcDatabaseType}, #{srcDatabaseName}, #{connectorUrl}, #{targetDatabaseType}, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson})
</insert>
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name,
target_table_name, type, connector_job_id, connector_json_data, src_topic_name, project_id, parent_id,
desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type,
src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name,
target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person)
values
<foreach collection="entities" item="entity" separator=",">
(#{entity.srcDatasourceId}, #{entity.targetDatasourceId}, #{entity.srcTableName}, #{entity.targetTableName},
#{entity.type}, #{entity.connectorJobId}, #{entity.connectorJsonData}, #{entity.srcTopicName},
#{entity.projectId}, #{entity.parentId}, #{entity.desensitizationField}, #{entity.arithmetic},
#{entity.pkName}, #{entity.sourceTypeName}, #{entity.targetTypeName}, #{entity.srcDatabaseType},
#{entity.srcDatabaseName}, #{entity.connectorUrl}, #{entity.targetDatabaseType},
#{entity.targetDatabaseName}, #{entity.srcDatasourceName}, #{entity.targetDatasourceName},
#{entity.storeType}, #{entity.status}, #{entity.createTime}, #{entity.updateTime}, #{entity.crePerson},
#{entity.uptPerson})
</foreach>
</insert>
<insert id="insertOrUpdateBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name,
target_table_name, type, connector_job_id, connector_json_data, src_topic_name, project_id, parent_id,
desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type,
src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name,
target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person)
values
<foreach collection="entities" item="entity" separator=",">
(#{entity.srcDatasourceId}, #{entity.targetDatasourceId}, #{entity.srcTableName}, #{entity.targetTableName},
#{entity.type}, #{entity.connectorJobId}, #{entity.connectorJsonData}, #{entity.srcTopicName},
#{entity.projectId}, #{entity.parentId}, #{entity.desensitizationField}, #{entity.arithmetic},
#{entity.pkName}, #{entity.sourceTypeName}, #{entity.targetTypeName}, #{entity.srcDatabaseType},
#{entity.srcDatabaseName}, #{entity.connectorUrl}, #{entity.targetDatabaseType},
#{entity.targetDatabaseName}, #{entity.srcDatasourceName}, #{entity.targetDatasourceName},
#{entity.storeType}, #{entity.status}, #{entity.createTime}, #{entity.updateTime}, #{entity.crePerson},
#{entity.uptPerson})
</foreach>
on duplicate key update
src_datasource_id = values(src_datasource_id) , target_datasource_id = values(target_datasource_id) ,
src_table_name = values(src_table_name) , target_table_name = values(target_table_name) , type = values(type) ,
connector_job_id = values(connector_job_id) , connector_json_data = values(connector_json_data) , src_topic_name
= values(src_topic_name) , project_id = values(project_id) , parent_id = values(parent_id) ,
desensitization_field = values(desensitization_field) , arithmetic = values(arithmetic) , pk_name =
values(pk_name) , source_type_name = values(source_type_name) , target_type_name = values(target_type_name) ,
src_database_type = values(src_database_type) , src_database_name = values(src_database_name) , connector_url =
values(connector_url) , target_database_type = values(target_database_type) , target_database_name =
values(target_database_name) , src_datasource_name = values(src_datasource_name) , target_datasource_name =
values(target_datasource_name) , store_type = values(store_type) , status = values(status) , create_time =
values(create_time) , update_time = values(update_time) , cre_person = values(cre_person) , upt_person =
values(upt_person)
</insert>
<!--通过主键修改数据-->
<update id="update">
update dmp_realtime_sync_info
<set>
<if test="srcDatasourceId != null">
src_datasource_id = #{srcDatasourceId},
</if>
<if test="targetDatasourceId != null">
target_datasource_id = #{targetDatasourceId},
</if>
<if test="srcTableName != null and srcTableName != ''">
src_table_name = #{srcTableName},
</if>
<if test="targetTableName != null and targetTableName != ''">
target_table_name = #{targetTableName},
</if>
<if test="type != null">
type = #{type},
</if>
<if test="connectorJobId != null and connectorJobId != ''">
connector_job_id = #{connectorJobId},
</if>
<if test="connectorJsonData != null">
connector_json_data = #{connectorJsonData},
</if>
<if test="srcTopicName != null and srcTopicName != ''">
src_topic_name = #{srcTopicName},
</if>
<if test="projectId != null">
project_id = #{projectId},
</if>
<if test="parentId != null">
parent_id = #{parentId},
</if>
<if test="desensitizationField != null and desensitizationField != ''">
desensitization_field = #{desensitizationField},
</if>
<if test="arithmetic != null and arithmetic != ''">
arithmetic = #{arithmetic},
</if>
<if test="pkName != null and pkName != ''">
pk_name = #{pkName},
</if>
<if test="sourceTypeName != null and sourceTypeName != ''">
source_type_name = #{sourceTypeName},
</if>
<if test="targetTypeName != null and targetTypeName != ''">
target_type_name = #{targetTypeName},
</if>
<if test="srcDatabaseType != null and srcDatabaseType != ''">
src_database_type = #{srcDatabaseType},
</if>
<if test="srcDatabaseName != null and srcDatabaseName != ''">
src_database_name = #{srcDatabaseName},
</if>
<if test="connectorUrl != null and connectorUrl != ''">
connector_url = #{connectorUrl},
</if>
<if test="targetDatabaseType != null and targetDatabaseType != ''">
target_database_type = #{targetDatabaseType},
</if>
<if test="targetDatabaseName != null and targetDatabaseName != ''">
target_database_name = #{targetDatabaseName},
</if>
<if test="srcDatasourceName != null and srcDatasourceName != ''">
src_datasource_name = #{srcDatasourceName},
</if>
<if test="targetDatasourceName != null and targetDatasourceName != ''">
target_datasource_name = #{targetDatasourceName},
</if>
<if test="storeType != null and storeType != ''">
store_type = #{storeType},
</if>
<if test="status != null and status != ''">
status = #{status},
</if>
<if test="createTime != null">
create_time = #{createTime},
</if>
<if test="updateTime != null">
update_time = #{updateTime},
</if>
<if test="crePerson != null and crePerson != ''">
cre_person = #{crePerson},
</if>
<if test="uptPerson != null and uptPerson != ''">
upt_person = #{uptPerson},
</if>
</set>
where id = #{id}
</update>
<!--通过主键删除-->
<delete id="deleteById">
delete from dmp_realtime_sync_info where id = #{id}
</delete>
<!--实时同步任务列表分页查询-->
<select id="queryRealTimeSyncListPage" parameterType="com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq"
resultType="com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto">
SELECT
t1.id,
t1.tree_id as treeId,
t2.name as treeName,
t1.status,
date_format(t1.update_time,'%Y-%m-%d %H:%i:%s') as updateTime,
t1.src_datasource_id as srcDatasourceId,
t1.src_datasource_name as srcDatasourceName,
t1.src_database_type as srcDatabaseType,
t1.target_datasource_id as targetDatasourceId,
t1.target_datasource_name as targetDatasourceName,
t1.target_database_type as targetDatabaseType
FROM dmp_realtime_sync_info t1
left join dmp_navigation_tree t2 on t1.tree_id=t2.id
left join dmp_syncing_datasource t3 ON t1.src_datasource_id = t3.ID
left join dmp_syncing_datasource t4 ON t1.target_datasource_id = t4.ID
where 1=1 and t1.project_id=#{projectId}
<if test="taskStatus != null and taskStatus != '' ">
AND t1.status = #{taskStatus}
</if>
<if test="treeId != null and treeId != '' ">
AND t1.tree_id = #{treeId}
</if>
<if test="targetDatabaseTypeId != null and targetDatabaseTypeId != '' ">
AND t4.DATASOURCE_TYPE = #{targetDatabaseTypeId}
</if>
<if test="sourceDatabaseTypeId != null and sourceDatabaseTypeId != ''">
AND t3.DATASOURCE_TYPE = #{sourceDatabaseTypeId}
</if>
<if test="sourceDatabaseName != null and sourceDatabaseName != '' ">
AND t1.src_datasource_name like CONCAT('%', #{sourceDatabaseName}, '%')
</if>
<if test="targetDatabaseName != null and targetDatabaseName != '' ">
AND t1.target_datasource_name like CONCAT('%', #{targetDatabaseName}, '%')
</if>
order by t1.create_time desc
</select>
</mapper>
\ No newline at end of file
{
"projectId":31,
"srcDataSourceId":205,
"targetDataSourceId":202,
"sourceName":"test34",
"sourceCustomArg":{
"decimal.handling.mode":"double"
},
"blacklistTables":"dmp_azkaban_exector_server_config,dmp_realtime_sync_handle_count,dmp_realtime_sync_info,dmp_realtime_sync_submit_result,table_operate_log",
"notSelectTables":"dmp_develop_function,dmp_develop_resource,dmp_develop_script,dmp_develop_task,dmp_develop_task_history,dmp_locks,dmp_module_operate_log,dmp_navigation_tree,dmp_open_api_es_fields,dmp_open_api_es_tagconfig,dmp_open_api_kafka_connector,dmp_ops_monitor,dmp_ops_monitor_setting,dmp_permission,dmp_project,dmp_project_member,dmp_project_member_role,dmp_project_openapi,dmp_project_orgid,dmp_project_permission,dmp_project_permission_bak,dmp_project_role,dmp_project_role_menu,dmp_project_role_permission,dmp_project_superuser_role,dmp_project_system_info,dmp_realtime_sync_blacklist_table_info,dmp_spark_applications,dmp_syncing_datasource,dmp_syncing_datasource_type,dmp_syncing_job_conf,dmp_syncing_job_reader,dmp_syncing_job_writer,dmp_system_lookup,dmp_system_user,dmp_table,dmp_table_access_auth,dmp_table_category,dmp_table_column,dmp_table_ddl_log,dmp_table_field_mapping,dmp_table_field_schema,dmp_table_manage,dmp_task_dependent,dmp_task_instance,dmp_task_instance_optlog,dmp_task_instance_runlog,dmp_task_schedule,dmp_user_member,dmp_work_flow_publish_details,dmp_work_flow_submit_details,dv_rule_check_result_t,dv_rule_t,dv_task_rule_t,oauth_token,oauth_token_key,salej2",
"connectorUrl":"connect1@http://172.18.104.130:9993/connectors",
"tables":[
{
"sourceTableName":"dmp_data_contrast",
"targetTableName":"dmp_data_contrast",
"toptic":"",
"desensitizationField":"target_database_name",
"arithmetic":"HmacSHA256",
"pkName":"source_database_name,source_table_name,stat_date"
}
]
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment