Commit de7ca49a authored by mcb's avatar mcb

提交

parent ff35f8cd
This diff is collapsed.
......@@ -38,6 +38,7 @@ public class DataSourceController {
* 数据源列表分页查询
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "数据源列表分页查询", notes = "数据源列表分页查询")
@PostMapping(value = "/dataSourceListPage")
......@@ -62,6 +63,7 @@ public class DataSourceController {
* 批量删除数据源
*
* @return
* @author Bellamy
*/
@ApiImplicitParam(name = "datasourceId", value = "数据源id")
@ApiOperation(value = "批量删除数据源", notes = "批量删除数据源")
......@@ -78,6 +80,7 @@ public class DataSourceController {
* 获取数据源类型-下拉框
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "获取数据源类型", notes = "获取数据源类型-下拉框")
@GetMapping(value = "/queryDatasourceType")
......@@ -90,6 +93,7 @@ public class DataSourceController {
* 新增获取数据源类型
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "新增--获取数据源类型", notes = "获取数据源类型")
@GetMapping(value = "/queryGroupDatasourceType")
......@@ -102,6 +106,7 @@ public class DataSourceController {
* 保存数据源
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "保存数据源", notes = "保存数据源")
@PostMapping(value = "/addDatasourceInfo")
......@@ -114,6 +119,7 @@ public class DataSourceController {
* 测试连通性
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "测试连通性", notes = "测试连通性")
@PostMapping(value = "/testConnection")
......@@ -126,6 +132,7 @@ public class DataSourceController {
* 编辑数据源,根据id查询数据回显
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "编辑数据源--根据id查询数据回显", notes = "编辑数据源--根据id查询数据回显")
@GetMapping(value = "/selectDataSourceInfoById")
......@@ -144,6 +151,7 @@ public class DataSourceController {
* 编辑数据源
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "编辑数据源", notes = "编辑数据源")
@PostMapping(value = "/updateDatasourceInfo")
......@@ -159,6 +167,7 @@ public class DataSourceController {
* 获取数据源类型输入框属性
*
* @return
* @author Bellamy
*/
@ApiImplicitParam(name = "datasourceTypeId", value = "数据源类型id")
@ApiOperation(value = "获取数据源类型输入框属性", notes = "获取数据源类型输入框属性")
......
......@@ -37,6 +37,7 @@ public class OfflineSynchController {
* 任务列表分页查询
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "任务列表分页查询", notes = "任务列表分页查询")
@PostMapping(value = "/taskListPage")
......@@ -56,10 +57,11 @@ public class OfflineSynchController {
* 获取源数据库名称——下拉框
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "获取源数据库名称-下拉框", notes = "获取源数据库名称")
@GetMapping(value = "/sourceDbList")
@ApiImplicitParam(name = "projectId", value = "项目id")
@ApiImplicitParam(name = "projectId", value = "项目id",required = true)
public JsonResult<List<SourceDbNameListDto>> getSourceDbList(@RequestParam Integer projectId) throws Exception {
JsonResult<List<SourceDbNameListDto>> jsonResult = offlineSynchService.querygSourceDbList(projectId);
return jsonResult;
......@@ -69,6 +71,7 @@ public class OfflineSynchController {
* 根据源数据库id,获取源数据表——下拉框
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "根据源数据库id,获取源数据表-下拉框", notes = "根据源数据库id,获取源数据表")
@GetMapping(value = "/sourceTableList")
......@@ -82,6 +85,7 @@ public class OfflineSynchController {
* 任务立即运行
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "任务立即运行", notes = "任务立即运行")
@GetMapping(value = "/taskRunNowByTaskId")
......@@ -98,6 +102,7 @@ public class OfflineSynchController {
* 根据taskId删除任务
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "删除任务", notes = "删除任务")
@GetMapping(value = "/delTaskByTaskId")
......@@ -114,6 +119,7 @@ public class OfflineSynchController {
* 状态查看列表分页查询
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "状态查看", notes = "状态查看列表分页查询")
@PostMapping(value = "/checkTaskStatus")
......@@ -126,6 +132,7 @@ public class OfflineSynchController {
* 校验状态详情--查看 规则执行结果
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "校验状态详情--查看", notes = "校验状态详情--查看")
@PostMapping(value = "/checkJyStatusInfo")
......@@ -145,6 +152,7 @@ public class OfflineSynchController {
* 获取源表和目标表的字段
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "获取源表和目标表的字段", notes = "获取源表和目标表的字段")
@PostMapping(value = "/getSoureAndTargetColumns")
......@@ -157,13 +165,14 @@ public class OfflineSynchController {
* 校验规则
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "校验规则", notes = "校验规则")
@PostMapping(value = "/findListJyRule")
public PageInfoResponse<DvRuleTDto> findListJyRule(@RequestBody BasePageBean basePageBean, HttpServletRequest httpRequest) throws Exception {
PageInfoResponse<DvRuleTDto> pageInfo = new PageInfoResponse<DvRuleTDto>();
try {
pageInfo = offlineSynchService.queryJyRuleListPage(basePageBean,httpRequest);
pageInfo = offlineSynchService.queryJyRuleListPage(basePageBean, httpRequest);
} catch (Exception e) {
pageInfo.setMessage("查询失败");
pageInfo.setCode(ResultCode.INTERNAL_SERVER_ERROR);
......@@ -176,6 +185,7 @@ public class OfflineSynchController {
* 保存离线任务数据
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "保存离线任务数据", notes = "保存离线任务数据")
@PostMapping(value = "/addSyncTask")
......@@ -188,6 +198,7 @@ public class OfflineSynchController {
* 编辑离线任务数据
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "编辑离线任务数据", notes = "编辑离线任务数据")
@PostMapping(value = "/updateSyncTask")
......
......@@ -4,13 +4,15 @@ import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.utils.realTime.CmdUtils;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeTableInfoReq;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import com.jz.dmp.modules.service.DmpRealtimeSyncInfoService;
import com.jz.dmp.modules.service.impl.OfflineSynchServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
......@@ -19,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
/**
* 实时同步任务
......@@ -65,26 +68,96 @@ public class RealTimeSyncController {
}
/**
* 启动实时同步任务
* 批量启动实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-05
*/
@ApiOperation(value = "启动实时同步任务", notes = "启动实时同步任务")
@ApiOperation(value = "批量启动运行实时同步任务", notes = "批量启动实时同步任务")
@GetMapping(value = "/startRealTimeSync")
@ApiImplicitParam(name = "realTaskId", value = "任务id")
public JsonResult startRealTimeSync(@RequestParam String realTaskId) throws Exception {
if (StringUtils.isEmpty(realTaskId)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "任务id不能为空!");
}
DmpRealtimeSyncInfo dmpRealtimeSyncInfo = dmpRealtimeSyncInfoService.queryById(Integer.valueOf(realTaskId));
String srcTopicName = dmpRealtimeSyncInfo.getSrcTopicName();
System.out.println(srcTopicName);
logger.info("############正常执行表数据........"+realTaskId);
String shellPath ="/app/bigdata-app/scripts/trigger_straming.sh";
CmdUtils.callShell(shellPath, srcTopicName);
String[] ids = realTaskId.split(",");
List<DmpRealtimeSyncInfo> list = dmpRealtimeSyncInfoService.queryListById(ids);
if (list.size() > 0 && list != null) {
for (int i = 0; i < list.size(); i++) {
DmpRealtimeSyncInfo dmpRealtimeSyncInfo = list.get(i);
String srcTopicName = dmpRealtimeSyncInfo.getSrcTopicName();
System.out.println(srcTopicName);
logger.info("############正常执行表数据id........" + ids[i]);
String shellPath = "/app/bigdata-app/scripts/trigger_straming.sh";
CmdUtils.callShell(shellPath, srcTopicName);
}
}
return new JsonResult();
}
/**
* 删除实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-05
*/
@ApiOperation(value = "删除实时同步任务", notes = "删除实时同步任务")
@GetMapping(value = "/delRealTimeSync")
@ApiImplicitParam(name = "realTaskId", value = "任务id")
public JsonResult delRealTimeSync(@RequestParam String realTaskId) throws Exception {
if (StringUtils.isEmpty(realTaskId)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "任务id不能为空!");
}
boolean jsonResult = dmpRealtimeSyncInfoService.deleteById(Integer.valueOf(realTaskId));
if (jsonResult) {
return new JsonResult();
} else {
return new JsonResult(ResultCode.INTERNAL_SERVER_ERROR, "删除失败!");
}
}
/**
* 获取数据源和目标数据源下拉框
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
@ApiOperation(value = "获取数据源和目标数据源下拉框", notes = "获取数据源和目标数据源下拉框")
@GetMapping(value = "/getDatasourceNameList")
@ApiImplicitParams({@ApiImplicitParam(name = "projectId", value = "项目id",required = true),
@ApiImplicitParam(name = "type", value = "数据源类型:01来源,02 目标源",required = true)})
public JsonResult<List<DataSourceNameListDto>> getDatasourceNameList(@RequestParam String projectId, @RequestParam String type) throws Exception {
if (StringUtils.isEmpty(projectId)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "项目id不能为空!");
}
if (StringUtils.isEmpty(type)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "类型不能为空!");
}
JsonResult<List<DataSourceNameListDto>> jsonResult = dmpRealtimeSyncInfoService.queryDatasourceNameList(projectId, type);
return jsonResult;
}
/**
* 根据数据源id获取表详细信息
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
@ApiOperation(value = "新增--获取表信息", notes = "新增--获取表信息")
@PostMapping(value = "/getSourceDbTableList")
public JsonResult getTableInfo(@RequestBody RealTimeTableInfoReq req, HttpServletRequest httpRequest) throws Exception {
if (StringUtils.isEmpty(req.getProjectId())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "项目id不能为空!");
}
if (StringUtils.isEmpty(req.getSrcDatasourceId())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "来源数据源id不能为空!");
}
JsonResult jsonResult = dmpRealtimeSyncInfoService.queryTableInfoByParams(req);
return jsonResult;
}
}
\ No newline at end of file
package com.jz.dmp.modules.controller.DataIntegration.bean;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
/**
* @ClassName: DataSourceNameListDto
* @Description: 获取数据源和目标数据源下拉框返回参数对象
* @Author:Bellamy
* @Date 2021/01/06
* @Version 1.0
*/
@ApiModel(value = "获取数据源和目标数据源下拉框返回参数", description = "获取数据源和目标数据源下拉框返回参数")
public class DataSourceNameListDto {
/**
* 数据源id
*/
@ApiModelProperty(value = "数据源id")
private Long id;
/**
* 数据源名称
*/
@ApiModelProperty(value = "数据源名称")
private String datasourceName;
/**
* 数据源描述
*/
@ApiModelProperty(value = "数据源描述")
private String datasourceDesc;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getDatasourceName() {
return datasourceName;
}
public void setDatasourceName(String datasourceName) {
this.datasourceName = datasourceName;
}
public String getDatasourceDesc() {
return datasourceDesc;
}
public void setDatasourceDesc(String datasourceDesc) {
this.datasourceDesc = datasourceDesc;
}
}
......@@ -17,7 +17,7 @@ public class RealTimeSyncListDto {
/*
* 实时同步任务ID
* */
@ApiModelProperty(value = "实时同步任务ID")
@ApiModelProperty(value = "ID")
private String id;
/*
......@@ -29,19 +29,19 @@ public class RealTimeSyncListDto {
/*
* 业务树节点名称
* */
@ApiModelProperty(value = "业务树节点名称")
@ApiModelProperty(value = "任务名称")
private String treeName;
/*
* 实时同步任务状态
* */
@ApiModelProperty(value = "实时同步任务状态")
@ApiModelProperty(value = "状态")
private String status;
/*
* 更新时间
* */
@ApiModelProperty(value = "更新时间")
@ApiModelProperty(value = "最近操作时间")
private String updateTime;
/*
......@@ -53,7 +53,7 @@ public class RealTimeSyncListDto {
/*
* 来源数据源名称
* */
@ApiModelProperty(value = "来源数据源名称")
@ApiModelProperty(value = "来源数据源")
private String srcDatasourceName;
/*
......@@ -71,13 +71,13 @@ public class RealTimeSyncListDto {
/*
* 目标数据源名称
* */
@ApiModelProperty(value = "目标数据源名称")
@ApiModelProperty(value = "去向数据源")
private String targetDatasourceName;
/*
* 目标数据源类型
* */
@ApiModelProperty(value = "目标数据源类型")
@ApiModelProperty(value = "去向数据源类型")
private String targetDatabaseType;
public String getId() {
......
......@@ -24,7 +24,7 @@ public class RealTimeSyncListReq extends BasePageBean {
private String sourceDatabaseTypeId;
/*
* 目标数据源名称
* 来源数据源名称
* */
@ApiModelProperty(value = "来源数据源名称")
private String sourceDatabaseName;
......@@ -58,9 +58,15 @@ public class RealTimeSyncListReq extends BasePageBean {
/*
* 节点id
* */
@ApiModelProperty(value = "节点id")
@ApiModelProperty(value = "节点名称或id")
private String treeId;
/*
* 节点id
* */
@ApiModelProperty(value = "节点名称")
private String treeName;
public String getProjectId() {
return projectId;
}
......@@ -116,4 +122,12 @@ public class RealTimeSyncListReq extends BasePageBean {
public void setTreeId(String treeId) {
this.treeId = treeId;
}
public String getTreeName() {
return treeName;
}
public void setTreeName(String treeName) {
this.treeName = treeName;
}
}
package com.jz.dmp.modules.controller.DataIntegration.bean;
import com.jz.common.page.BasePageBean;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
/**
* @ClassName: RealTimeTableInfoReq
* @Description: 新增实时任务--获取表信息请求参数
* @Author:Bellamy
* @Date 2021/01/06
* @Version 1.0
*/
@ApiModel(value = "新增实时任务--获取表信息请求参数", description = "新增实时任务--获取表信息请求参数")
public class RealTimeTableInfoReq {
/*
* 来源数据源id
* */
@ApiModelProperty(value = "来源数据源id")
private String srcDatasourceId;
/*
* 目标数据源名称
* */
@ApiModelProperty(value = "选中表名称")
private String selectTablesName;
/*
* 目标数据源id
* */
@ApiModelProperty(value = "目标数据源id")
private String targetDatasourceId;
/*
* 黑名单表
* */
@ApiModelProperty(value = "黑名单表")
private String blacklistTables;
/*
* 项目id
* */
@ApiModelProperty(value = "项目id")
@NotNull(message = "项目id不能为空")
@NotEmpty(message = "项目ID不能空")
private String projectId;
/*
* 是否显示列的相关信息 默认不显示
* */
@ApiModelProperty(value = "是否显示列的相关信息 默认不显示")
private String isContainColumnInfo;
/*
* 是否显示 是否已经提交
* */
@ApiModelProperty(value = "是否显示 是否已经提交")
private String isContainSubmited ;
/*
* 文本框中输入的要查询的表
* */
@ApiModelProperty(value = "文本框中输入的要查询的表")
private String toQueryTableName;
public String getProjectId() {
return projectId;
}
public void setProjectId(String projectId) {
this.projectId = projectId;
}
public String getSrcDatasourceId() {
return srcDatasourceId;
}
public void setSrcDatasourceId(String srcDatasourceId) {
this.srcDatasourceId = srcDatasourceId;
}
public String getSelectTablesName() {
return selectTablesName;
}
public void setSelectTablesName(String selectTablesName) {
this.selectTablesName = selectTablesName;
}
public String getTargetDatasourceId() {
return targetDatasourceId;
}
public void setTargetDatasourceId(String targetDatasourceId) {
this.targetDatasourceId = targetDatasourceId;
}
public String getBlacklistTables() {
return blacklistTables;
}
public void setBlacklistTables(String blacklistTables) {
this.blacklistTables = blacklistTables;
}
public String getIsContainColumnInfo() {
return isContainColumnInfo;
}
public void setIsContainColumnInfo(String isContainColumnInfo) {
this.isContainColumnInfo = isContainColumnInfo;
}
public String getIsContainSubmited() {
return isContainSubmited;
}
public void setIsContainSubmited(String isContainSubmited) {
this.isContainSubmited = isContainSubmited;
}
public String getToQueryTableName() {
return toQueryTableName;
}
public void setToQueryTableName(String toQueryTableName) {
this.toQueryTableName = toQueryTableName;
}
}
......@@ -16,7 +16,7 @@ public class TaskListPageDto {
/*
* taskId
* */
@ApiModelProperty(value = "任务taskId")
@ApiModelProperty(value = "任务Id")
private String taskId;
/*
......@@ -28,13 +28,13 @@ public class TaskListPageDto {
/*
* 节点名称
* */
@ApiModelProperty(value = "节点名称")
@ApiModelProperty(value = "名称")
private String treeName;
/*
* 创建人
* */
@ApiModelProperty(value = "创建人")
@ApiModelProperty(value = "责任人")
private String createUserId;
/*
......@@ -66,7 +66,7 @@ public class TaskListPageDto {
/*
* 更改时间
* */
@ApiModelProperty(value = "改时间")
@ApiModelProperty(value = "改时间")
private String updateTime;
/*
......
package com.jz.dmp.modules.dao;
import com.jz.common.constant.JsonResult;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
/**
* 实时同步任务(DmpRealtimeSyncInfo)表数据库访问层
......@@ -83,7 +86,39 @@ public interface DmpRealtimeSyncInfoDao {
/**
* 实时同步任务列表分页查询
*
* @return
*/
List<RealTimeSyncListDto> queryRealTimeSyncListPage(RealTimeSyncListReq req) throws Exception;
List<DmpRealtimeSyncInfo> queryListById(@Param("ids") String[] ids) throws Exception;
/**
* 获取数据源和目标数据源下拉框
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
List<DataSourceNameListDto> queryDatasourceNameList(@Param("projectId") String projectId, @Param("type") String type) throws Exception;
/**
* 根据数据源id获取数据信息
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
Map querygSourceDbInfoById(@Param("srcDataSourceId") String srcDataSourceId) throws Exception;
List<Map> queryRealTimeInfoByDataSourceId(@Param("srcDatasourceId") String srcDatasourceId, @Param("targetDatasourceId") String targetDatasourceId);
/**
* 查询源数据源的黑名单表
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
Map queryBlackTableByDataSourceId(@Param("srcDatasourceId") String srcDatasourceId);
}
\ No newline at end of file
package com.jz.dmp.modules.model;
import java.util.List;
public class RealTimeSyncModel {
/**
* 数据源Id
*/
private Integer dataSourceId;
/**
* 数据源名称
*/
private String dataSourceName;
/**
* 数据库名称
*/
private String dbName ;
/**
* 数据源下的表信息
*/
private List<RealTimeSyncTableModel> tables;
public Integer getDataSourceId() {
return dataSourceId;
}
public void setDataSourceId(Integer dataSourceId) {
this.dataSourceId = dataSourceId;
}
public String getDataSourceName() {
return dataSourceName;
}
public void setDataSourceName(String dataSourceName) {
this.dataSourceName = dataSourceName;
}
public String getDbName() {
return dbName;
}
public void setDbName(String dbName) {
this.dbName = dbName;
}
public List<RealTimeSyncTableModel> getTables() {
return tables;
}
public void setTables(List<RealTimeSyncTableModel> tables) {
this.tables = tables;
}
}
package com.jz.dmp.modules.model;
/**
* 实时同步表信息 表级别的信息
*/
public class RealTimeSyncTableModel {
/**
* 表名
*/
private String tableName;
/**
* 主键名称
*/
private String pkName;
/**
* 是否提交同步过同步
*/
private boolean isSubmited;
/**
* 是否是黑名单
*/
private boolean isBlacklist;
/**
* 脱敏字段
*/
private String desensitizationField;
/**
* 脱敏算法
*/
private String arithmetic;
/**
* 表下的所有列信息
*/
//private RealTimeSyncColumnModel columnInfo;
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public String getPkName() {
return pkName;
}
public void setPkName(String pkName) {
this.pkName = pkName;
}
/*public RealTimeSyncColumnModel getColumnInfo() {
return columnInfo;
}
public void setColumnInfo(RealTimeSyncColumnModel columnInfo) {
this.columnInfo = columnInfo;
}
*/
public boolean isSubmited() {
return isSubmited;
}
public void setIsSubmited(boolean isSubmited) {
this.isSubmited = isSubmited;
}
public boolean isBlacklist() {
return isBlacklist;
}
public void setIsBlacklist(boolean isBlacklist) {
this.isBlacklist = isBlacklist;
}
public String getDesensitizationField() {
return desensitizationField;
}
public void setDesensitizationField(String desensitizationField) {
this.desensitizationField = desensitizationField;
}
public String getArithmetic() {
return arithmetic;
}
public void setArithmetic(String arithmetic) {
this.arithmetic = arithmetic;
}
}
package com.jz.dmp.modules.service;
import com.jz.common.constant.JsonResult;
import com.jz.common.page.PageInfoResponse;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeTableInfoReq;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import java.util.List;
......@@ -63,4 +65,29 @@ public interface DmpRealtimeSyncInfoService {
* @return
*/
PageInfoResponse<RealTimeSyncListDto> queryRealTimeSyncListPage(RealTimeSyncListReq req) throws Exception;
/**
* 根据多个实时任务id查询任务信息
*
* @return
*/
List<DmpRealtimeSyncInfo> queryListById(String[] ids) throws Exception;
/**
* 获取数据源和目标数据源下拉框
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
JsonResult<List<DataSourceNameListDto>> queryDatasourceNameList(String projectId, String type) throws Exception;
/**
* 根据数据源id获取表详细信息
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
JsonResult queryTableInfoByParams(RealTimeTableInfoReq req) throws Exception;
}
\ No newline at end of file
package com.jz.dmp.modules.service.impl;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.TaskListPageDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeTableInfoReq;
import com.jz.dmp.modules.dao.DmpRealtimeSyncInfoDao;
import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import com.jz.dmp.modules.service.DmpRealtimeSyncInfoService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* 实时同步任务(DmpRealtimeSyncInfo)表服务实现类
......@@ -24,6 +32,7 @@ import java.util.List;
* @since 2021-01-05 14:18:03
*/
@Service("dmpRealtimeSyncInfoService")
@Transactional
public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoService {
@Autowired
......@@ -83,18 +92,38 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
* @return 是否成功
*/
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public boolean deleteById(Integer id) {
return this.dmpRealtimeSyncInfoDao.deleteById(id) > 0;
}
/**
* 实时同步任务列表分页查询
*
* @return
* @author Bellamy
*/
@Override
public PageInfoResponse<RealTimeSyncListDto> queryRealTimeSyncListPage(RealTimeSyncListReq req) throws Exception {
PageInfoResponse<RealTimeSyncListDto> pageInfoResponse = new PageInfoResponse<>();
if (StringUtils.isNotEmpty(req.getSourceDatabaseName())) { //来源数据源名称
req.setSourceDatabaseName(req.getSourceDatabaseName().trim());
}
if (StringUtils.isNotEmpty(req.getTargetDatabaseName())) { //目标源数据源名称
req.setTargetDatabaseName(req.getTargetDatabaseName().trim());
}
if (StringUtils.isNotEmpty(req.getTreeId())) {
//判断是否为整数 是整数返回true,否则返回false
Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
if (pattern.matcher(req.getTreeId().trim()).matches()) {
req.setTreeId(req.getTreeId()); //id
} else {
req.setTreeName(req.getTreeId().trim());//节点名称
req.setTreeId("");
}
}
PageHelper.startPage(req.getPageNum(), req.getPageSize());
List<RealTimeSyncListDto> list = dmpRealtimeSyncInfoDao.queryRealTimeSyncListPage(req);
PageInfo<RealTimeSyncListDto> pageInfo = new PageInfo<>(list);
......@@ -104,4 +133,69 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
pageInfoResponse.setData(pageInfo);
return pageInfoResponse;
}
/**
* 根据多个实时任务id查询任务信息
*
* @return
* @author Bellamy
*/
@Override
public List<DmpRealtimeSyncInfo> queryListById(String[] ids) throws Exception {
return dmpRealtimeSyncInfoDao.queryListById(ids);
}
/**
* 获取数据源和目标数据源下拉框
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
@Override
public JsonResult<List<DataSourceNameListDto>> queryDatasourceNameList(String projectId, String type) throws Exception {
List<DataSourceNameListDto> list = dmpRealtimeSyncInfoDao.queryDatasourceNameList(projectId, type);
return new JsonResult(list);
}
/**
* 根据数据源id获取表详细信息
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
@Override
public JsonResult queryTableInfoByParams(RealTimeTableInfoReq req) throws Exception {
Map<String, Map<String, String>> paramsMap = null;
//根据数据源id获取数据信息
Map sourceDbInfo = dmpRealtimeSyncInfoDao.querygSourceDbInfoById(req.getSrcDatasourceId());
//数据源对应的表详细信息
List<Map> list = dmpRealtimeSyncInfoDao.queryRealTimeInfoByDataSourceId(req.getSrcDatasourceId(), req.getTargetDatasourceId());
if (list.size() > 0 && list != null) {
paramsMap = new HashMap<>();
for (Map item : list) {
String srcTableName = item.get("srcTableName").toString();
paramsMap.put(srcTableName, item);
}
}
Map<String, String> blackMapSetting = null;
//查询源数据源的黑名单表
Map blackTableMap = dmpRealtimeSyncInfoDao.queryBlackTableByDataSourceId(req.getSrcDatasourceId());
if (blackTableMap.size() > 0 && blackTableMap != null) {
String blacklistTable = blackTableMap.get("blacklistTable").toString();
if (StringUtils.isNotEmpty(blacklistTable)) {
String[] blacklistTableArr = blacklistTable.split(",");
if (blacklistTableArr.length > 0 && blacklistTableArr != null) {
blackMapSetting = new HashMap<>();
for (String str : blacklistTableArr) {
blackMapSetting.put(str, str);
}
}
}
}
//DBUtil.getDataSourceTables(sourceDbInfo,req.getToQueryTableName(),true,paramsMap,req.getBlacklistTables(),req.getSelectTablesName(),blackMapSetting);
return null;
}
}
\ No newline at end of file
......@@ -327,25 +327,76 @@
left join dmp_syncing_datasource t3 ON t1.src_datasource_id = t3.ID
left join dmp_syncing_datasource t4 ON t1.target_datasource_id = t4.ID
where 1=1 and t1.project_id=#{projectId}
<if test="taskStatus != null and taskStatus != '' ">
AND t1.status = #{taskStatus}
</if>
<if test="treeId != null and treeId != '' ">
AND t1.tree_id = #{treeId}
</if>
<if test="targetDatabaseTypeId != null and targetDatabaseTypeId != '' ">
AND t4.DATASOURCE_TYPE = #{targetDatabaseTypeId}
</if>
<if test="sourceDatabaseTypeId != null and sourceDatabaseTypeId != ''">
AND t3.DATASOURCE_TYPE = #{sourceDatabaseTypeId}
</if>
<if test="sourceDatabaseName != null and sourceDatabaseName != '' ">
AND t1.src_datasource_name like CONCAT('%', #{sourceDatabaseName}, '%')
</if>
<if test="targetDatabaseName != null and targetDatabaseName != '' ">
AND t1.target_datasource_name like CONCAT('%', #{targetDatabaseName}, '%')
</if>
<if test="taskStatus != null and taskStatus != '' "> AND t1.status = #{taskStatus} </if>
<if test="treeId != null and treeId != '' "> AND t1.id = #{treeId} </if>
<if test="targetDatabaseTypeId != null and targetDatabaseTypeId != '' "> AND t4.DATASOURCE_TYPE = #{targetDatabaseTypeId} </if>
<if test="sourceDatabaseTypeId != null and sourceDatabaseTypeId != ''"> AND t3.DATASOURCE_TYPE = #{sourceDatabaseTypeId} </if>
<if test="sourceDatabaseName != null and sourceDatabaseName != '' "> AND t1.src_datasource_name like CONCAT('%', #{sourceDatabaseName}, '%') </if>
<if test="targetDatabaseName != null and targetDatabaseName != '' "> AND t1.target_datasource_name like CONCAT('%', #{targetDatabaseName}, '%') </if>
<if test="treeName != null and treeName != '' "> AND t2.name like CONCAT('%', #{treeName}, '%') </if>
order by t1.create_time desc
</select>
<select id="queryListById" resultMap="DmpRealtimeSyncInfoMap">
select
id, src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data
, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name
, src_database_type, src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name
, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person
from dmp_realtime_sync_info
where id in
<foreach collection="ids" item="item" open="(" separator="," close=")">
#{item}
</foreach>
</select>
<!-- 获取数据源和目标数据源-->
<select id="queryDatasourceNameList" resultType="com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto" parameterType="map">
SELECT
dsd.ID as id,
dsd.DATASOURCE_NAME as datasourceName,
dsd.DATASOURCE_DESC as datasourceDesc
FROM dmp_syncing_datasource dsd
LEFT JOIN dmp_syncing_datasource_type dsdt ON dsd.DATASOURCE_TYPE = dsdt.ID
WHERE dsd.PROJECT_ID = #{projectId} and dsd.DATA_STATUS = '1'
<if test="type != '' and type == 01"> AND dsdt.DATASOURCE_TYPE = 'MYSQL' </if>
<if test="type != '' and type == 02"> AND (dsdt.DATASOURCE_TYPE = 'KUDU' or dsdt.DATASOURCE_TYPE = 'HDFS') </if>
</select>
<!--根据数据源id获取数据信息-->
<select id="querygSourceDbInfoById" parameterType="map" resultType="map">
select
ds.id as id,
ds.jdbc_url as jdbcUrl,
ds.user_name as userName,
ds.password,
ds.db_name as dbName,
dsdt.datasource as datasourceTypeName,
dsdt.driver_class_name as driverClassName,
dsdt.datasource,
from dmp_syncing_datasource ds
inner join dmp_syncing_datasource_type dsdt on ds.datasource_type = dsdt.id
where 1=1 and ds.data_status = '1' dsd.ID = #{srcDataSourceId}
</select>
<select id="queryListById" resultMap="java.util.Map">
select
src_table_name as srcTableName,
connector_job_id as connectorJobId,
desensitization_field as desensitizationField,
arithmetic
from dmp_realtime_sync_info
where 1=1 and type =2
<if test="targetDataSourceId != null"> and target_datasource_id = #{targetDataSourceId} </if>
<if test="srcTableName != null"> and src_table_name = #{srcTableName} </if>
</select>
<!-- 查询源数据源的黑名单表 -->
<select id="queryBlackTableByDataSourceId" resultType="java.util.Map">
SELECT
datasource_id as datasourceId
,blacklist_table as blacklistTable
FROM dmp_realtime_sync_blacklist_table_info
WHERE datasource_id = #{srcDatasourceId}
</select>
</mapper>
\ No newline at end of file
......@@ -375,13 +375,13 @@
<select id="queryDbTypeByGroup" resultType="map">
SELECT
datasource_catename as datasourceCatename,
datasource_catecode as datasourceCatecode
datasource_catecode as datasourceCatecode,
any_value(datasource_catename) as datasourceCatename,
any_value(datasource_catetype) as datasourceCatetype
from dmp_syncing_datasource_type
where data_status = '1' and is_enabled = '1'
<if test="datasourceTypeId !=null and datasourceTypeId !=''">and id=#{datasourceTypeId}</if>
group by datasourceCatecode
ORDER BY datasource_catetype
ORDER BY datasourceCatetype
</select>
</mapper>
\ No newline at end of file
......@@ -24,7 +24,7 @@
<select id="queryById" resultMap="DmpSyncingDatasourceTypeMap">
select
ID, DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET
from dmp_web.dmp_syncing_datasource_type
from dmp_syncing_datasource_type
where ID = #{id}
</select>
......@@ -32,7 +32,7 @@
<select id="queryAllByLimit" resultMap="DmpSyncingDatasourceTypeMap">
select
ID, DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET
from dmp_web.dmp_syncing_datasource_type
from dmp_syncing_datasource_type
limit #{offset}, #{limit}
</select>
......@@ -42,7 +42,7 @@
ID, DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED,
DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT,
IS_ENABLE_SOURCE, IS_ENABLE_TARGET
from dmp_web.dmp_syncing_datasource_type
from dmp_syncing_datasource_type
<where>
<if test="id != null">
and ID = #{id}
......@@ -94,12 +94,12 @@
<!--新增所有列-->
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET)
insert into dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET)
values (#{datasource}, #{datasourceCatecode}, #{datasourceCatename}, #{datasourceType}, #{imgUrl}, #{dataStatus}, #{isEnabled}, #{datasourceCatetype}, #{driverClassName}, #{isEnableTest}, #{defaultSourceScript}, #{defaultTargetScript}, #{isEnableSource}, #{isEnableTarget})
</insert>
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME,
insert into dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME,
DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST,
DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET)
values
......@@ -112,7 +112,7 @@
</insert>
<insert id="insertOrUpdateBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME,
insert into dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME,
DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST,
DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET)
values
......@@ -134,7 +134,7 @@
<!--通过主键修改数据-->
<update id="update">
update dmp_web.dmp_syncing_datasource_type
update dmp_syncing_datasource_type
<set>
<if test="datasource != null and datasource != ''">
DATASOURCE = #{datasource},
......@@ -184,7 +184,7 @@
<!--通过主键删除-->
<delete id="deleteById">
delete from dmp_web.dmp_syncing_datasource_type where ID = #{id}
delete from dmp_syncing_datasource_type where ID = #{id}
</delete>
</mapper>
\ No newline at end of file
......@@ -38,7 +38,7 @@
</select>
<!--获取源数据库名称-->
<select id="querygSourceDbList" parameterType="map" resultType="com.jz.dmp.modules.controller.DataIntegration.bean.SourceDbNameListDto">
<select id="querygSourceDbList" parameterType="java.util.Map" resultType="com.jz.dmp.modules.controller.DataIntegration.bean.SourceDbNameListDto">
select
ds.id as id,
ds.datasource_name as datasourceNameOrg,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment