Commit 2ad4f2a4 authored by mcb's avatar mcb

no message

parent e0fbe06a
package com.jz.common.utils.web;
import com.jz.common.utils.StringUtils;
/**
* Xml文件内容辅助工具类
*/
public class XmlUtils {
/**
* 获取xml属性值(多个取第一个)
*
* @param content xml内容
* @param property 属性名(标签)
* @return 属性值
*/
public static String getPropertyValue(Object content, String property) {
if (null != content && StringUtils.hasLength(content.toString()) && StringUtils.isNotBlank(property)) {
String tagBegin = "<" + property + ">";
String tagEnd = "</" + property + ">";
String ct = content.toString();
int bi = ct.indexOf(tagBegin) + tagBegin.length();
int ei = ct.indexOf(tagEnd);
if (bi != -1 && ei != -1)
return StringUtils.toTrimString(ct.substring(bi, ei));
}
return "";
}
public static void main(String[] args) {
String content = "<target_insert_merge_overwrite>merge</target_insert_merge_overwrite>" +
" <target_db_connection>kudu_tmp</target_db_connection>\n" +
" <target_database>tmp</target_database>\n" +
" <target_table>impala::tmp.rzf_report_rzfyinliu\n" +
" <target_primary_columns></target_table></target_primary_columns>";
System.out.println(content);
String result = getPropertyValue(content, "target_table");
System.out.println("result: " + result);
}
}
\ No newline at end of file
......@@ -35,7 +35,7 @@ public class SwaggerConfiguration {
return new ApiInfoBuilder()
.title("数据中台 APIs")
.description("swagger-bootstrap-ui")
.termsOfServiceUrl("http://localhost:5050/")
.termsOfServiceUrl("http://localhost:7181/")
.contact("Bellamy")
.version("1.0")
.build();
......
......@@ -17,13 +17,13 @@ public class SyncDmpTaskAddReq implements Serializable {
private static final long serialVersionUID = 4286346100382139287L;
@NotEmpty(message = "入参不能为空")
private List<Map<String, Object>> params;
private Map<String, Object> params;
public List<Map<String, Object>> getParams() {
public Map<String, Object> getParams() {
return params;
}
public void setParams(List<Map<String, Object>> params) {
public void setParams(Map<String, Object> params) {
this.params = params;
}
}
package com.jz.dmp.modules.controller;
import com.jz.dmp.modules.model.DmpNavigationTree;
import com.jz.dmp.modules.service.DmpNavigationTreeService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* DMP资源导航树(DmpNavigationTree)表控制层
*
* @author Bellamy
* @since 2020-12-29 15:08:17
*/
@RestController
@RequestMapping("/dmpNavigationTree")
@Api(tags = "DMP资源导航树")
public class DmpNavigationTreeController {
/**
* 服务对象
*/
@Autowired
private DmpNavigationTreeService dmpNavigationTreeService;
/**
* 通过主键查询单条数据
*
* @param id 主键
* @return 单条数据
*/
@GetMapping("/selectOne")
@ApiOperation(value = "通过主键查询单条数据", notes = "通过主键查询单条数据")
public DmpNavigationTree selectOne(Integer id) {
return this.dmpNavigationTreeService.queryById(id);
}
}
\ No newline at end of file
package com.jz.dmp.modules.controller;
import com.jz.dmp.modules.model.DvRuleT;
import com.jz.dmp.modules.service.DvRuleTService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* ???(DvRuleT)表控制层
*
......
package com.jz.dmp.modules.dao;
import com.jz.dmp.modules.model.DmpDevelopTask;
import org.apache.ibatis.annotations.Param;
import java.util.Map;
public interface DmpDevelopTaskDao {
......@@ -11,4 +14,10 @@ public interface DmpDevelopTaskDao {
int deleteTaskByTaskId(String taskId) throws Exception;
int deleteNavigationTreeByTreeId(String treeId) throws Exception;
Integer getDbInfoByParam(@Param("xmlTdb") String xmlTdb, @Param("projectId") Integer projectId);
int insert(DmpDevelopTask dmpDevelopTask) throws Exception;
DmpDevelopTask selectTaskInfoByParam(@Param("treeId") long treeId) throws Exception;
}
package com.jz.dmp.modules.dao;
import com.jz.dmp.modules.model.DmpNavigationTree;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* DMP资源导航树(DmpNavigationTree)表数据库访问层
*
* @author makejava
* @since 2020-12-29 15:08:16
*/
public interface DmpNavigationTreeDao{
/**
* 通过ID查询单条数据
*
* @param id 主键
* @return 实例对象
*/
DmpNavigationTree queryById(Integer id);
/**
* 查询指定行数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
List<DmpNavigationTree> queryAllByLimit(@Param("offset") int offset, @Param("limit") int limit);
/**
* 通过实体作为筛选条件查询
*
* @param dmpNavigationTree 实例对象
* @return 对象列表
*/
List<DmpNavigationTree> queryAll(DmpNavigationTree dmpNavigationTree);
/**
* 新增数据
*
* @param dmpNavigationTree 实例对象
* @return 影响行数
*/
int insert(DmpNavigationTree dmpNavigationTree);
/**
* 批量新增数据(MyBatis原生foreach方法)
*
* @param entities List<DmpNavigationTree> 实例对象列表
* @return 影响行数
*/
int insertBatch(@Param("entities") List<DmpNavigationTree> entities);
/**
* 批量新增或按主键更新数据(MyBatis原生foreach方法)
*
* @param entities List<DmpNavigationTree> 实例对象列表
* @return 影响行数
*/
int insertOrUpdateBatch(@Param("entities") List<DmpNavigationTree> entities);
/**
* 修改数据
*
* @param dmpNavigationTree 实例对象
* @return 影响行数
*/
int update(DmpNavigationTree dmpNavigationTree);
/**
* 通过主键删除数据
*
* @param id 主键
* @return 影响行数
*/
int deleteById(Integer id);
int countTreeByName(DmpNavigationTree tree);
}
\ No newline at end of file
......@@ -103,4 +103,5 @@ public interface DmpSyncingDatasourceDao {
DataSourceListDto selectDataSourceInfoById(Map map) throws Exception;
List<DmpSyncingDatasource> findListByParams(DmpSyncingDatasource ds) throws Exception;
}
\ No newline at end of file
package com.jz.dmp.modules.model;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Date;
......@@ -42,7 +45,7 @@ public class DmpDevelopTask implements Serializable {
/**
* 脚本
*/
private Object script;
private String script;
/**
* 数据状态
*/
......@@ -92,6 +95,49 @@ public class DmpDevelopTask implements Serializable {
private Integer isGziped;
/**
* 项目id
*/
private Integer projectId;
/*
* 父节点ID
* */
private Integer parentId;
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private byte[] data;
/*
* 源数据库id
* */
@ApiModelProperty(value = "源数据库id")
private String sourceDbId;
/*
* 目标数据库名称
* */
@ApiModelProperty(value = "目标数据库名称")
private String targetDbName;
/*
* 目标数据库表名称
* */
@ApiModelProperty(value = "目标数据库表")
private String targetTableName;
/*
* 源数据库表名称
* */
@ApiModelProperty(value = "源数据库表")
private String sourceDbName;
/*
* 源数据库表名称
* */
@ApiModelProperty(value = "源数据库表")
private String sourceTableName;
public Integer getId() {
return id;
......@@ -149,11 +195,11 @@ public class DmpDevelopTask implements Serializable {
this.taskDesc = taskDesc;
}
public Object getScript() {
public String getScript() {
return script;
}
public void setScript(Object script) {
public void setScript(String script) {
this.script = script;
}
......@@ -269,4 +315,67 @@ public class DmpDevelopTask implements Serializable {
this.isGziped = isGziped;
}
public Integer getProjectId() {
return projectId;
}
public void setProjectId(Integer projectId) {
this.projectId = projectId;
}
public Integer getParentId() {
return parentId;
}
public void setParentId(Integer parentId) {
this.parentId = parentId;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public String getSourceDbId() {
return sourceDbId;
}
public void setSourceDbId(String sourceDbId) {
this.sourceDbId = sourceDbId;
}
public String getTargetDbName() {
return targetDbName;
}
public void setTargetDbName(String targetDbName) {
this.targetDbName = targetDbName;
}
public String getSourceTableName() {
return sourceTableName;
}
public void setSourceTableName(String sourceTableName) {
this.sourceTableName = sourceTableName;
}
public String getTargetTableName() {
return targetTableName;
}
public void setTargetTableName(String targetTableName) {
this.targetTableName = targetTableName;
}
public String getSourceDbName() {
return sourceDbName;
}
public void setSourceDbName(String sourceDbName) {
this.sourceDbName = sourceDbName;
}
}
\ No newline at end of file
package com.jz.dmp.modules.model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Date;
/**
* DMP资源导航树(DmpNavigationTree)实体类
*
* @author Bellamy
* @since 2020-12-29 15:08:15
*/
@ApiModel(value = "DMP资源导航树", description = "DMP资源导航树")
public class DmpNavigationTree implements Serializable {
private static final long serialVersionUID = -12645974631378172L;
/**
* ID
*/
@ApiModelProperty(value = "ID")
private Integer id;
/**
* 树类别
*/
@ApiModelProperty(value = "树类别")
private String category;
/**
* 树类型
*/
@ApiModelProperty(value = "树类型")
private String type;
/**
* 名称
*/
@ApiModelProperty(value = "名称")
private String name;
/**
* 序号
*/
@ApiModelProperty(value = "序号")
private Integer treeSort;
/**
* 图标
*/
@ApiModelProperty(value = "图标")
private String icon;
/**
* 是否叶子节点
*/
@ApiModelProperty(value = "是否叶子节点")
private String isLevel;
/**
* 是否启用
*/
@ApiModelProperty(value = "是否启用")
private String isEnable;
/**
* 被谁锁定
*/
@ApiModelProperty(value = "被谁锁定")
private String lockByUser;
/**
* 锁定时间
*/
@ApiModelProperty(value = "锁定时间")
private Date lockTime;
/**
* 数据状态
*/
@ApiModelProperty(value = "数据状态")
private String dataStatus;
/**
* 创建用户ID
*/
@ApiModelProperty(value = "创建用户ID")
private String createUserId;
/**
* 数据创建时间
*/
@ApiModelProperty(value = "数据创建时间")
private Date createTime;
/**
* 创建用户ID
*/
@ApiModelProperty(value = "创建用户ID")
private String updateUserId;
/**
* 数据更新时间
*/
@ApiModelProperty(value = "数据更新时间")
private Date updateTime;
/**
* 项目ID
*/
@ApiModelProperty(value = "项目ID")
private Integer projectId;
/**
* 父节点ID
*/
@ApiModelProperty(value = "父节点ID")
private Integer parentId;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getTreeSort() {
return treeSort;
}
public void setTreeSort(Integer treeSort) {
this.treeSort = treeSort;
}
public String getIcon() {
return icon;
}
public void setIcon(String icon) {
this.icon = icon;
}
public String getIsLevel() {
return isLevel;
}
public void setIsLevel(String isLevel) {
this.isLevel = isLevel;
}
public String getIsEnable() {
return isEnable;
}
public void setIsEnable(String isEnable) {
this.isEnable = isEnable;
}
public String getLockByUser() {
return lockByUser;
}
public void setLockByUser(String lockByUser) {
this.lockByUser = lockByUser;
}
public Date getLockTime() {
return lockTime;
}
public void setLockTime(Date lockTime) {
this.lockTime = lockTime;
}
public String getDataStatus() {
return dataStatus;
}
public void setDataStatus(String dataStatus) {
this.dataStatus = dataStatus;
}
public String getCreateUserId() {
return createUserId;
}
public void setCreateUserId(String createUserId) {
this.createUserId = createUserId;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getUpdateUserId() {
return updateUserId;
}
public void setUpdateUserId(String updateUserId) {
this.updateUserId = updateUserId;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
public Integer getProjectId() {
return projectId;
}
public void setProjectId(Integer projectId) {
this.projectId = projectId;
}
public Integer getParentId() {
return parentId;
}
public void setParentId(Integer parentId) {
this.parentId = parentId;
}
}
\ No newline at end of file
package com.jz.dmp.modules.service;
import com.jz.common.constant.JsonResult;
import com.jz.dmp.modules.model.DmpDevelopTask;
/**
* 任务开发(DmpDevelopTask)表服务接口
*
......@@ -9,5 +12,5 @@ package com.jz.dmp.modules.service;
public interface DmpDevelopTaskService {
JsonResult submitSyncing(DmpDevelopTask task) throws Exception;
}
\ No newline at end of file
package com.jz.dmp.modules.service;
import com.jz.dmp.modules.model.DmpNavigationTree;
import java.util.List;
/**
* DMP资源导航树(DmpNavigationTree)表服务接口
*
* @author Bellamy
* @since 2020-12-29 15:08:16
*/
public interface DmpNavigationTreeService {
/**
* 通过ID查询单条数据
*
* @param id 主键
* @return 实例对象
*/
DmpNavigationTree queryById(Integer id);
/**
* 查询多条数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
List<DmpNavigationTree> queryAllByLimit(int offset, int limit);
/**
* 新增数据
*
* @param dmpNavigationTree 实例对象
* @return 实例对象
*/
DmpNavigationTree insert(DmpNavigationTree dmpNavigationTree);
/**
* 修改数据
*
* @param dmpNavigationTree 实例对象
* @return 实例对象
*/
DmpNavigationTree update(DmpNavigationTree dmpNavigationTree);
/**
* 通过主键删除数据
*
* @param id 主键
* @return 是否成功
*/
boolean deleteById(Integer id);
}
\ No newline at end of file
......@@ -76,4 +76,6 @@ public interface DmpSyncingDatasourceService {
JsonResult selectDataSourceInfoById(Map map) throws Exception;
JsonResult updateDatasourceById(DmpSyncingDatasourceReq saveBody) throws Exception;
List<DmpSyncingDatasource> findListByParams(DmpSyncingDatasource ds) throws Exception;
}
\ No newline at end of file
package com.jz.dmp.modules.service.impl;
import com.jz.agent.service.DmpDsAgentService;
import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.StringUtils;
import com.jz.common.utils.web.XmlUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.dao.DmpDevelopTaskDao;
import com.jz.dmp.modules.dao.DmpProjectDao;
import com.jz.dmp.modules.dao.DmpSyncingDatasourceDao;
import com.jz.dmp.modules.dao.DmpSyncingDatasourceTypeDao;
import com.jz.dmp.modules.model.*;
import com.jz.dmp.modules.service.DmpDevelopTaskService;
import com.jz.dmp.modules.service.DmpSyncingDatasourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 任务开发(DmpDevelopTask)表服务实现类
......@@ -12,10 +32,649 @@ import org.springframework.stereotype.Service;
* @since 2020-12-22 16:37:56
*/
@Service("dmpDevelopTaskService")
@Transactional
public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
private static Logger logger = LoggerFactory.getLogger(OfflineSynchServiceImpl.class);
@Autowired
private DmpDevelopTaskDao dmpDevelopTaskDao;
@Autowired
private DmpProjectDao dmpProjectDao;
@Autowired
private DmpSyncingDatasourceTypeDao dmpSyncingDatasourceTypeDao;
@Autowired
private DmpDsAgentService dmpDsAgentServiceImp;
@Autowired
private DmpSyncingDatasourceService dmpSyncingDatasourceService;
@Override
public JsonResult submitSyncing(DmpDevelopTask task) throws Exception {
String xmlContent;
try {
xmlContent = convert2SyncXmlContent(task);
System.out.println(xmlContent);
} catch (Exception e) {
logger.error("封装离线同步xml内容出错:" + e.getMessage(), e);
return new JsonResult(ResultCode.PARAMS_ERROR);
}
return submitSyncXml(task.getProjectId(), xmlContent);
}
/** 提交离线同步到指定projectId项目下(上传xml配置文件) */
private JsonResult submitSyncXml(long projectId, String xmlContent) {
String _type_ = XmlUtils.getPropertyValue(xmlContent, "type");
String _fileName_ = XmlUtils.getPropertyValue(xmlContent, "name");
String _dbName_ = XmlUtils.getPropertyValue(xmlContent, "target_database");
String _targetFtpConnection_ = XmlUtils.getPropertyValue(xmlContent, "target_ftp_connection");
if (StringUtils.isNull(_dbName_)) { // 2ftp类型中无target_database标签
if (_type_.toLowerCase().indexOf("2ftp") != -1) {
_dbName_ = _targetFtpConnection_;
} else {
throw new RuntimeException("离线同步配置缺失目标数据源名称!");
}
}
DmpAgentDatasourceInfo ds = new DmpAgentDatasourceInfo();
DmpProjectSystemInfo info = dmpProjectDao.queryProjectSystemInfo(projectId);
ds.setKerberosIsenable(info.getKerberosIsenable());
ds.setKerberosJaasConf(info.getKerberosJaasConf());
ds.setKerberosKrb5Conf(info.getKerberosKrb5Conf());
ds.setKerberosFqdn(info.getKerberosFqdn());
ds.setHdfsDefaultConf(""); // info.getHdfsDefaultConf());
ds.setHdfsConfPath(""); // info.getHdfsConfPath());
ds.setHdfsAuthPath(info.getKerberosKeytabConf()); // info.getHdfsAuthPath());
ds.setHdfsUserName(info.getHdfsUserName());
ds.setHdfsSyncingPath(info.getHdfsSyncingPath());
ds.setDefaultFs(info.getHdfsHttpPath());
ds.setTargetDbName(_dbName_);
ds.setTargetFileName(_fileName_ + ".xml");
ds.setProjectId(String.valueOf(projectId));
DmpAgentResult rst = dmpDsAgentServiceImp.submitSycingXmlConfig(ds, xmlContent);
if (!rst.getCode().val().equals("200")) {
return new JsonResult(rst.getCode(), rst.getMessage());
} else {
rst.setResult(JsonMapper.fromJsonString(rst.getMessage(), Boolean.class));
return new JsonResult(ResultCode.SUCCESS, rst);
}
}
public String convert2SyncXmlContent(DmpDevelopTask body) throws Exception {
StringBuilder sb = new StringBuilder();
long treeId = body.getTreeId();
DmpDevelopTask task = dmpDevelopTaskDao.selectTaskInfoByParam(treeId);
Map<String, Object> map = (Map<String, Object>) JsonMapper.fromJsonString(task.getScript(), Map.class);
Map<String, Object> scriptMap = (Map<String, Object>) map.get("scripts");
Object content = scriptMap.get("content");
if (null != content && StringUtils.hasLength(content.toString())) {
// 脚本模式-XML内容
sb.append(content);
} else {
// 脚本模式-json内容,解析JSON串并组装XML内容
Map<String, Object> settingMap = (Map<String, Object>) scriptMap.get("setting");
String _extract = (String) settingMap.get("extract");
String _extractExpression = (String) settingMap.get("extractExpression");
String _targetBucketCounts = (String) settingMap.get("targetBucketCounts");
String _errorLimitRecord = (String) settingMap.get("errorLimitRecord");
String _executorMemory = (String) settingMap.get("executorMemory");
String _executorCores = (String) settingMap.get("executorCores");
String _totalExecutorCores = (String) settingMap.get("totalExecutorCores");
String _ftColumn =(String) settingMap.get("ftColumn");//分桶字段
String _ftCount =(String) settingMap.get("ftCount");//分桶个数
String _separateMax =(String) settingMap.get("separateMax");//分桶字段最大值
String _separateMin =(String) settingMap.get("separateMin");//分桶字段最小值
Map<String, Object> readerMap = (Map<String, Object>) scriptMap.get("reader");
String _dbConnection = (String) readerMap.get("dbConnection");
String _fileType = (String) readerMap.get("fileType");
String _sourceHdfsPath = (String) readerMap.get("sourceHdfsPath");
String _sourceHdfsFile = (String) readerMap.get("sourceHdfsFile");
String _sourceFtpDir = (String) readerMap.get("sourceFtpDir");
String _sourceFtpFile = (String) readerMap.get("sourceFtpFile");
String _sourceSkipFtpFile = (String) readerMap.get("sourceSkipFtpFile");
String _sourceCsvDelimiter = (String) readerMap.get("sourceCsvDelimiter");
String _sourceCsvHeader = (String) readerMap.get("sourceCsvHeader");
String _sourceCsvCharset = (String) readerMap.get("sourceCsvCharset");
String _sourceCsvQuote = (String) readerMap.get("sourceCsvQuote");
String _sourceFtpLoadDate = (String) readerMap.get("sourceFtpLoadDate");
String _registerTableName = (String) readerMap.get("registerTableName");
String registerTableName_ = (String) readerMap.get("registerTableName");
String _dayByDay = (String) readerMap.get("dayByDay");
List<Map<String, Object>> _readerColumns = (List<Map<String, Object>>) readerMap.get("column");
Map<String, Object> writerMap = (Map<String, Object>) scriptMap.get("writer");
String _targetDbConnection = (String) writerMap.get("targetDbConnection");
String _targetTable = (String) writerMap.get("targetTable");
String _targetFtpDir = (String) writerMap.get("targetFtpDir");
String _targetFtpFile = (String) writerMap.get("targetFtpFile");
String _targetCsvDelimiter = (String) writerMap.get("targetCsvDelimiter");
String _targetCsvHeader = (String) writerMap.get("targetCsvHeader");
String _targetCsvCharset = (String) writerMap.get("targetCsvCharset");
String _targetInsertMergeOverwrite = (String) writerMap.get("targetInsertMergeOverwrite");
List<Map<String, Object>> _writerColumns = (List<Map<String, Object>>) writerMap.get("column");
Integer _projectId_ = body.getProjectId();
Integer _parentId_ = (Integer) map.get("parentId");
String _mode_ = (String) map.get("mode");
String _version_ = (String) map.get("version");
String _name_ = (String) map.get("name");
DmpSyncingDatasource sDS = this.getDmpSyncingDatasource(_projectId_, _dbConnection);
DmpSyncingDatasource tDS = this.getDmpSyncingDatasource(_projectId_, _targetDbConnection);
if (null == sDS || null == tDS || null == sDS.getDatasourceType() || null == tDS.getDatasourceType())
throw new Exception("同步数据来源或数据去向信息有误");
DmpSyncingDatasourceType sDST = dmpSyncingDatasourceTypeDao.queryById(sDS.getDatasourceType());
DmpSyncingDatasourceType tDST = dmpSyncingDatasourceTypeDao.queryById(tDS.getDatasourceType());
String _targetDbName_ = this.getTargetDBname(tDS);
String _sourceDbName_ = this.getTargetDBname(sDS);
String st = sDST.getDatasourceType();
String tt = tDST.getDatasourceType();
String ft = this.getFileType(_fileType, _sourceHdfsFile, _sourceFtpFile);
boolean isBigData2jdbc = ("HIVE".equals(st) || "KUDU".equals(st)) && "RDBMS".equals(tDST.getDatasourceCatecode());
boolean isBigData2ftp = ("HIVE".equals(st) || "KUDU".equals(st)) && "FTP".equals(tt);
if ("RDBMS".equals(sDST.getDatasourceCatecode()) && "HIVE".equals(tt)) {
_registerTableName = sDS.getDatasourceName() + "_" + _registerTableName.substring(_registerTableName.indexOf(".")+1);
}
String name_ = (StringUtils.hasLength(_targetDbName_) ? _targetDbName_ : tDS.getDatasourceName()) + "_" + _targetTable;
String type_ = this.getEtlProcessType(sDST, tDST, _fileType);
String[] sci = this.genSourceColumnsInfo(sDST, tDST, ft, _registerTableName, registerTableName_, _extract,
_extractExpression, _readerColumns, _sourceDbName_);
String source_table_ = sci[0];
String source_columns_ = sci[1];
String source_sql_ = sci[2];
String target_table_ = _targetTable;
String[] tci = this.genTargetColumnsInfo(sDST, tDST, _targetTable, _targetBucketCounts, _writerColumns);
String target_columns_ = tci[0];
String target_primary_columns_ = tci[1];
String target_partition_columns_ = tci[2];
String target_table_creation_ = tci[3];
String clean_rules_ = sci[3] + tci[4];
String check_rules_ = sci[4] + tci[5];
sb.append("<etlprocess>").append("\r\n");
sb.append("<name>").append(name_).append("</name>").append("\r\n");
sb.append("<type>").append(type_).append("</type>").append("\r\n");
if ("RDBMS".equals(sDST.getDatasourceCatecode())) {
sb.append("<source_table db_connection=\"").append(sDS.getDatasourceName()).append("\" register_table_name=\"")
.append(_registerTableName.substring(_registerTableName.indexOf(".")+1)).append("\">").append(source_table_).append("</source_table>").append("\r\n");
}
if ("HIVE".equals(st) || "KUDU".equals(st)) {
String sourceTableName = _registerTableName;
if ("KUDU".equals(st)) {
sourceTableName = "impala::" + _sourceDbName_ + "." + _registerTableName;
}
sb.append("<source_table db_connection=\"").append(sDS.getDatasourceName()).append("\" register_table_name=\"")
.append(_registerTableName).append("\">").append(sourceTableName).append("</source_table>").append("\r\n");
}
if (isBigData2ftp) { // 前端考虑到源中可能包含字段信息,分开处理
_sourceCsvDelimiter = _targetCsvDelimiter;
_sourceCsvCharset = _targetCsvCharset;
_sourceCsvHeader = _targetCsvHeader;
}
if ("csv".equals(ft) || "FTP".equals(st) || isBigData2ftp) {
sb.append("<source_csv_delimiter>").append(_sourceCsvDelimiter).append("</source_csv_delimiter>").append("\r\n");
sb.append("<source_csv_header>").append(_sourceCsvHeader).append("</source_csv_header>").append("\r\n");
sb.append("<source_csv_charset>").append(_sourceCsvCharset).append("</source_csv_charset>").append("\r\n");
sb.append("<source_csv_quote>").append(_sourceCsvQuote).append("</source_csv_quote>").append("\r\n");
}
if ("FTP".equals(st)) {
sb.append("<source_ftp_connection>").append(_dbConnection).append("</source_ftp_connection>").append("\r\n");
sb.append("<source_ftp_dir>").append(_sourceFtpDir).append("</source_ftp_dir>").append("\r\n");
sb.append("<source_ftp_file>").append(_sourceFtpFile).append("</source_ftp_file>").append("\r\n");
sb.append("<source_skip_ftp_file>").append(_sourceSkipFtpFile).append("</source_skip_ftp_file>").append("\r\n");
sb.append("<source_ftp_load_date>").append(_sourceFtpLoadDate).append("</source_ftp_load_date>").append("\r\n");
}
if ("KUDU".equals(tt) || "HIVE".equals(tt)) {
sb.append("<source_hdfs_path>").append(this.getSourceHdfsDir(_sourceHdfsPath, _sourceHdfsFile)).append("</source_hdfs_path>").append("\r\n");
sb.append("<source_columns>").append(source_columns_).append("</source_columns>").append("\r\n");
}
sb.append("<source_sql>").append(source_sql_).append("</source_sql>").append("\r\n");
if ("KUDU".equals(tt)) {
target_table_ = "impala::" + _targetDbName_ + "." + _targetTable;
}
if (isBigData2ftp) {
sb.append("<source_download_file>").append(_targetFtpFile).append("</source_download_file>").append("\r\n");
sb.append("<source_database>").append(_sourceDbName_).append("</source_database>").append("\r\n");
sb.append("<target_insert_merge_overwrite>").append("overwrite").append("</target_insert_merge_overwrite>").append("\r\n");
sb.append("<target_ftp_connection>").append(_targetDbConnection).append("</target_ftp_connection>").append("\r\n");
sb.append("<target_ftp_dir>").append(_targetFtpDir).append("</target_ftp_dir>").append("\r\n");
} else {
sb.append("<target_insert_merge_overwrite>").append(_targetInsertMergeOverwrite).append("</target_insert_merge_overwrite>").append("\r\n");
sb.append("<target_db_connection>").append(_targetDbConnection).append("</target_db_connection>").append("\r\n");
sb.append("<target_database>").append(_targetDbName_).append("</target_database>").append("\r\n");
sb.append("<target_table>").append(target_table_).append("</target_table>").append("\r\n");
sb.append("<target_primary_columns>").append(target_primary_columns_).append("</target_primary_columns>").append("\r\n");
sb.append("<target_table_creation>").append(target_table_creation_).append("</target_table_creation>").append("\r\n");
if (!isBigData2jdbc) {
sb.append("<target_columns>").append(target_columns_).append("</target_columns>").append("\r\n");
sb.append("<target_partition_columns>").append(target_partition_columns_).append("</target_partition_columns>").append("\r\n");
}
}
sb.append("<ft_column>").append(_ftColumn).append("</ft_column>").append("\r\n");
sb.append("<ft_count>").append(_ftCount).append("</ft_count>").append("\r\n");
sb.append("<separate_max>").append(_separateMax).append("</separate_max>").append("\r\n");
sb.append("<separate_min>").append(_separateMin).append("</separate_min>").append("\r\n");
sb.append("<executor_memory>").append(_executorMemory).append("</executor_memory>").append("\r\n");
sb.append("<executor_cores>").append(_executorCores).append("</executor_cores>").append("\r\n");
sb.append("<total_executor_cores>").append(_totalExecutorCores).append("</total_executor_cores>").append("\r\n");
sb.append("<day_by_day>").append(_dayByDay).append("</day_by_day>").append("\r\n");
sb.append("<clean_rules>").append(clean_rules_).append("</clean_rules>").append("\r\n");
sb.append("<check_rules>").append(check_rules_).append("</check_rules>").append("\r\n");
sb.append("</etlprocess>").append("\r\n");
}
return sb.toString();
}
private DmpSyncingDatasource getDmpSyncingDatasource(Integer projectId, String dsName) throws Exception {
DmpSyncingDatasource ds = new DmpSyncingDatasource();
ds.setDataStatus("1");
ds.setDatasourceName(dsName);
ds.setProjectId(projectId);
List<DmpSyncingDatasource> dsList = dmpSyncingDatasourceService.findListByParams(ds);
if(dsList != null && dsList.size() > 0) {
return dsList.get(0);
} else {
return null;
}
}
private String getFileType(String ft, String _sourceHdfsFile, String _sourceFtpFile) {
if(StringUtils.isBlank(ft)) {
if(StringUtils.isNotBlank(_sourceHdfsFile)) {
int idx = _sourceHdfsFile.lastIndexOf(".");
if(idx > -1) {
String nft = _sourceHdfsFile.substring(idx+1, _sourceHdfsFile.length());
if(StringUtils.isNotBlank(nft)) {
if("csv".equalsIgnoreCase(nft)) {
ft = "csv";
} else if("json".equalsIgnoreCase(nft)) {
ft = "json";
} else if("txt".equalsIgnoreCase(nft)) {
ft = "csv";
} else {
ft = "csv";
}
}
}
}
if(StringUtils.isNotBlank(_sourceFtpFile)) {
int idx = _sourceFtpFile.lastIndexOf(".");
if(idx > -1) {
String nft = _sourceFtpFile.substring(idx+1, _sourceFtpFile.length());
if(StringUtils.isNotBlank(nft)) {
if("csv".equalsIgnoreCase(nft)) {
ft = "csv";
} else if("json".equalsIgnoreCase(nft)) {
ft = "json";
} else if("txt".equalsIgnoreCase(nft)) {
ft = "csv";
} else {
ft = "csv";
}
}
}
}
}
return ft;
}
private String getTargetDBname(DmpSyncingDatasource ds) {
String url = ds.getJdbcUrl();
if(StringUtils.isNotBlank(url)) {
if(ds.getDatasourceType() ==2 ){//sqlserver
String[] arr1 = url.split("=");
String[] arr2 = arr1[1].split(";");
System.out.println( arr2[0]);
return arr2[0];
}else{
String[] arr1 = url.split("/");
String ls = arr1[arr1.length-1];
String[] arr2 = ls.split("\\?");
return arr2[0];
}
} else {
if(ds.getDatasourceType().equals("11")){//ftp
return ds.getDbName();
}
return "";
}
}
private String getEtlProcessType(DmpSyncingDatasourceType s, DmpSyncingDatasourceType t, String ft) {
String sT = s.getDatasourceType().toLowerCase();
String tT = t.getDatasourceType().toLowerCase();
String _ft = "";
if(StringUtils.isNotBlank(ft)) {
_ft = ft;
}
if(s.getDatasourceCatecode().equals("RDBMS")) {
sT = "jdbc";
}
return sT + _ft + "2" + tT;
}
private String getTypeSize(String type) {
if (StringUtils.isNull(type))
return "";
String str = "";
if (type.toUpperCase().indexOf("VARCHAR") != -1) {
str += "(255)";
} else if (type.toUpperCase().indexOf("CHAR") != -1) {
str += "(5)";
} else {
// 其他类型
}
return str;
}
private String[] genSourceColumnsInfo(DmpSyncingDatasourceType s, DmpSyncingDatasourceType t, String ft,
String sTable, String sTable_, String _extract, String extractExpression, List<Map<String, Object>> rColumns,
String sourceDbName) {
String source_columns = "";
String source_columns_hive_partition = "";
String source_table = "<![CDATA[ (select ";
String source_sql = "<![CDATA[ select ";
String source_sql_hive_partition = "";
String clean_rules = "";
String check_rules = "";
Map<String, String> relatedTypeMap = getRelatedTypeMap(s, t);
int i = 0;
for (Map<String, Object> m : rColumns) {
String name = (String) m.get("name");
String type = StringUtils.getStringValue(m.get("type"), "");//relatedTypeMap.get(StringUtils.getStringValue(m.get("type"), ""));
String isPt = (String) m.get("isPt");
if("SQLSERVER".equals(s.getDatasourceType())){ //对sqlserver 修改
if("STRING".equals(type)){
source_table += "REPLACE(REPLACE(REPLACE("+name+", CHAR(13), '<br1>'),CHAR(10),'<br2>'),',','<comma>') AS "+name;
}else{
source_table += name;
}
}else{
// source_table += "replace(cast(" + name + " as char), char(10), '') as " + name;
source_table += name;
}
if ("HIVE".equals(t.getDatasourceType()) && "1".equals(isPt)) {
source_columns_hive_partition += name;
source_sql_hive_partition += "cast(" + name + " as " + type + ") as ";
if ("json".equals(ft)) {
source_sql_hive_partition += name.toLowerCase();
} else {
source_sql_hive_partition += name;
}
} else {
source_columns += name;
if("varchar".equalsIgnoreCase(type)) {
source_sql += "cast(" + name + " as string) as ";
} else if("char".equalsIgnoreCase(type)) {
source_sql += "cast(" + name + " as string) as ";
} else if("datetime".equalsIgnoreCase(type)) {
source_sql += "cast(" + name + " as timestamp) as ";
} else {
source_sql += "cast(" + name + " as " + type + ") as ";
}
if ("json".equals(ft)) {
source_sql += name.toLowerCase();
} else {
source_sql += name;
}
}
source_table += ", ";
if ("HIVE".equals(t.getDatasourceType()) && "1".equals(isPt)) {
source_columns_hive_partition += ", ";
source_sql_hive_partition += ", ";
} else {
source_columns += ", ";
source_sql += ", ";
}
if (i == rColumns.size() - 1) {
String loweredStr = source_sql.toLowerCase();
boolean appjz = loweredStr.indexOf("etl_create_time,") == -1;
boolean appUpd = loweredStr.indexOf("etl_update_time,") == -1;
if (appjz) source_sql += "cast(current_timestamp() as string) as etl_create_time, ";
if (appUpd) source_sql += "cast(current_timestamp() as string) as etl_update_time, ";
}
i++;
}
if ("HIVE".equals(t.getDatasourceType()) && StringUtils.isNotBlank(source_columns_hive_partition)) {
//source_sql_hive_partition = source_sql_hive_partition.substring(0, source_sql_hive_partition.length()-2);
//source_columns_hive_partition = source_columns_hive_partition.substring(0, source_columns_hive_partition.length()-2);
source_sql += source_sql_hive_partition;
source_columns += source_columns_hive_partition;
}
source_table = source_table.substring(0, source_table.length() - 2);
if ("RDBMS".equals(s.getDatasourceCatecode())) {
source_table += " from " + sourceDbName + "." + sTable_;
} else {
source_table += " from " + sTable;
}
if (!"full".equalsIgnoreCase(_extract) && StringUtils.isNotBlank(extractExpression)) {
source_table += " " + extractExpression;
}
source_table += ") as t]]>";
if ("FTP".equals(s.getDatasourceType()) || "HDFS".equals(s.getDatasourceType())) {
sTable = "$$tmpTableName";
}
source_sql = source_sql.substring(0, source_sql.length() - 2);
source_columns = source_columns.substring(0, source_columns.length() - 2);
source_sql += " from " + sTable + "]]> ";
return new String[]{source_table, source_columns, source_sql, clean_rules, check_rules};
}
private Map<String, String> getRelatedTypeMap(DmpSyncingDatasourceType sds, DmpSyncingDatasourceType tds) {
/* 封装cast类型,直接根据key=源字段名获取;默认封装2kudu,除非源或目标库是hive库 */
Map<String, Object> params = new HashMap();
Map<String, String> typeMap = new HashMap();
DmpSyncingDatasourceType entity = new DmpSyncingDatasourceType();
if ("HIVE".equalsIgnoreCase(sds.getDatasourceType()) || "HIVE".equalsIgnoreCase(tds.getDatasourceType())) {
entity.setDatasourceType("HIVE");
} else { // 剩余情况均可认定为2kudu
entity.setDatasourceType("KUDU");
}
/*params.put("sourceDsId", sds.getId());
params.put("targetDsId", dmpSyncingDatasourceTypeService.findList(entity).get(0).getId());
List<Map<String, Object>> dataTypeMapping = dmpTableFieldSchemaService.getDataTypeMapping(params);
for (Map<String, Object> mapp : dataTypeMapping) {
typeMap.put(StringUtils.toString(mapp.get("sourceType")),
StringUtils.getStringValue(mapp.get("targetType"), "STRING"));
}*/
return typeMap;
}
private String getSourceHdfsDir(String filePath, String fileName) {
if(StringUtils.isNotBlank(fileName)) {
boolean f1 = filePath.endsWith("/");
boolean f2 = fileName.startsWith("/");
if(f1 && f2) {
fileName = fileName.substring(1, fileName.length());
}
if(!f1 && !f2) {
filePath += "/";
}
return filePath + fileName;
} else {
return filePath;
}
}
private String[] genTargetColumnsInfo(DmpSyncingDatasourceType s, DmpSyncingDatasourceType t, String tTable,
String _targetBucketCounts, List<Map<String, Object>> wColumns) {
String target_columns = "";
String target_table_creation = "create table if not exists " + tTable + "(";
String target_primary_columns = "";
String target_partition_columns = "";
String target_partition_columns_hive = "";
String target_columns_hive_partition = "";
String clean_rules = "";
String check_rules = "";
int i = 0;
for (Map<String, Object> m : wColumns) {
String name = (String) m.get("name");
String type = (String) m.get("type");
String comment = m.get("comment") == null ? "" : (String) m.get("comment");
String isPk = (String) m.get("isPk");
String isPt = (String) m.get("isPt");
if ("HIVE".equals(t.getDatasourceType()) && "1".equals(isPt)) {
target_columns_hive_partition += name;
} else {
target_columns += name;
}
if (!"1".equals(isPt) && "HIVE".equals(t.getDatasourceType())) {
target_table_creation += name + " " + type;
target_table_creation += " comment '" + comment + "'";
} else if ("KUDU".equals(t.getDatasourceType())) {
target_table_creation += name + " " + type;
target_table_creation += "1".equals(isPk) ? " not null " : " null ";
target_table_creation += " comment '" + comment + "'";
} else if ("RDBMS".equals(t.getDatasourceCatecode())) {
// POSTGRESQL支持不指定varchar的长度
target_table_creation += name + " " + type + getTypeSize(type);
target_table_creation += "1".equals(isPk) ? " not null " : " null ";
target_table_creation += "comment '" + comment + "'";
}
if ("1".equals(isPk)) {
target_primary_columns += name + ", ";
}
if ("1".equals(isPt)) {
target_partition_columns += name + ", ";
target_partition_columns_hive += name + " " + type + ", ";
}
String[] rls = this.appendRules(m.get("rules"), name, clean_rules, check_rules);
clean_rules = rls[0];
check_rules = rls[1];
if ("HIVE".equals(t.getDatasourceType()) && "1".equals(isPt)) {
target_columns_hive_partition += ", ";
} else {
target_columns += ", ";
}
if (!"1".equals(isPt) && "HIVE".equals(t.getDatasourceType())) {
target_table_creation += ", ";
} else if ("KUDU".equals(t.getDatasourceType())) {
target_table_creation += ", ";
} else if ("RDBMS".equals(t.getDatasourceCatecode())) {
target_table_creation += ", ";
}
if (i == wColumns.size() - 1) {
// 若源表自带etl_create_time、etl_update_time字段,则不进行追加
String loweredStr = target_columns.toLowerCase();
boolean appjz = loweredStr.indexOf("etl_create_time,") == -1;
boolean appUpd = loweredStr.indexOf("etl_update_time,") == -1;
if ("KUDU".equals(t.getDatasourceType())) {
if (appjz) target_table_creation += "etl_create_time string null comment 'etl_create_time', ";
if (appUpd) target_table_creation += "etl_update_time string null comment 'etl_update_time', ";
} else if ("HIVE".equals(t.getDatasourceType())) {
if (appjz) target_table_creation += "etl_create_time string comment 'etl_create_time', ";
if (appUpd) target_table_creation += "etl_update_time string comment 'etl_update_time', ";
} else if ("RDBMS".equals(t.getDatasourceCatecode())) {
// 未兼容常用关系型Microsoft Access, DM
if ("ORACLE".equals(t.getDatasourceType())) {
if (appjz) target_table_creation += "etl_create_time varchar2(30) comment 'etl_create_time', ";
if (appUpd) target_table_creation += "etl_update_time varchar2(30) comment 'etl_update_time', ";
} else {
if (appjz) target_table_creation += "etl_create_time varchar(30) comment 'etl_create_time', ";
if (appUpd) target_table_creation += "etl_update_time varchar(30) comment 'etl_update_time', ";
}
}
if (appjz) target_columns += "etl_create_time, ";
if (appUpd) target_columns += "etl_update_time, ";
if ("HIVE".equals(t.getDatasourceType()) && StringUtils.isNotBlank(target_columns_hive_partition)) {
target_columns += target_columns_hive_partition;
}
if (StringUtils.isNotBlank(target_primary_columns))
target_primary_columns = target_primary_columns.substring(0, target_primary_columns.length() - 2);
if (StringUtils.isNotBlank(target_partition_columns_hive))
target_partition_columns_hive = target_partition_columns_hive.substring(0, target_partition_columns_hive.length() - 2);
if (StringUtils.isNotBlank(target_partition_columns))
target_partition_columns = target_partition_columns.substring(0, target_partition_columns.length() - 2);
if (StringUtils.isNotBlank(target_table_creation))
target_table_creation = target_table_creation.substring(0, target_table_creation.length() - 2);
if (StringUtils.isNotBlank(target_primary_columns)) {
if ("KUDU".equals(t.getDatasourceType()) || "RDBMS".equals(t.getDatasourceCatecode())) {
target_table_creation += ", primary key (" + target_primary_columns + ") ";
}
}
}
i++;
}
target_table_creation += ") ";
if ("KUDU".equals(t.getDatasourceType())) {
if (StringUtils.isNotBlank(target_partition_columns)) {
target_table_creation += "partition by hash (" + target_partition_columns + ") ";
} else {
if (StringUtils.isNotBlank(target_primary_columns)) {
target_table_creation += "partition by hash (" + target_primary_columns + ") ";
}
}
if (StringUtils.isNotBlank(_targetBucketCounts)) {
target_table_creation += "partitions " + _targetBucketCounts;
}
target_table_creation += " stored as kudu ";
} else if ("HIVE".equals(t.getDatasourceType())) {
if (StringUtils.isNotBlank(target_partition_columns)) {
target_table_creation += "partitioned by (" + target_partition_columns_hive + ") ";
}
} else if ("RDBMS".equals(t.getDatasourceCatecode())) {
// 分区,暂忽略关系型数据库的分区功能
}
target_columns = target_columns.substring(0, target_columns.length() - 2);
return new String[]{target_columns, target_primary_columns, target_partition_columns,
target_table_creation, clean_rules, check_rules};
}
private String[] appendRules(Object obj, String columnName, String clean_rules, String check_rules) {
List<Map<String, Object>> rules = (List<Map<String, Object>>) obj;
for(Map<String, Object> m : rules) {
String method = (String) m.get("method"); // 规则方法
String type = (String) m.get("type"); // 1:清洗规则;0:校验规则
List<String> params = (ArrayList<String>) m.get("params"); // 规则参数列表,字符串
String sub_rules = "\t<column_name name=\""+columnName+"\">\r\n";
sub_rules += "\t\t<rules_type method=\""+method+"\">\r\n";
if(params != null && params.size() > 0)
for(String rv : params) {
sub_rules += "\t\t\t<rules_value>" + rv + "</rules_value>\r\n";
}
sub_rules += "\t\t</rules_type>\r\n";
sub_rules += "\t</column_name>\r\n";
if("0".equals(type)) {
if(StringUtils.isBlank(check_rules))
sub_rules = "\r\n" + sub_rules;
check_rules += sub_rules;
}
if("1".equals(type)) {
if(StringUtils.isBlank(clean_rules))
sub_rules = "\r\n" + sub_rules;
clean_rules += sub_rules;
}
}
return new String[] {clean_rules, check_rules};
}
}
\ No newline at end of file
package com.jz.dmp.modules.service.impl;
import com.jz.dmp.modules.dao.DmpNavigationTreeDao;
import com.jz.dmp.modules.model.DmpNavigationTree;
import com.jz.dmp.modules.service.DmpNavigationTreeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* DMP资源导航树(DmpNavigationTree)表服务实现类
*
* @author Bellamy
* @since 2020-12-29 15:08:16
*/
@Service("dmpNavigationTreeService")
public class DmpNavigationTreeServiceImpl implements DmpNavigationTreeService {
@Autowired
private DmpNavigationTreeDao dmpNavigationTreeDao;
/**
* 通过ID查询单条数据
*
* @param id 主键
* @return 实例对象
*/
@Override
public DmpNavigationTree queryById(Integer id) {
return this.dmpNavigationTreeDao.queryById(id);
}
/**
* 查询多条数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
@Override
public List<DmpNavigationTree> queryAllByLimit(int offset, int limit) {
return this.dmpNavigationTreeDao.queryAllByLimit(offset, limit);
}
/**
* 新增数据
*
* @param dmpNavigationTree 实例对象
* @return 实例对象
*/
@Override
public DmpNavigationTree insert(DmpNavigationTree dmpNavigationTree) {
this.dmpNavigationTreeDao.insert(dmpNavigationTree);
return dmpNavigationTree;
}
/**
* 修改数据
*
* @param dmpNavigationTree 实例对象
* @return 实例对象
*/
@Override
public DmpNavigationTree update(DmpNavigationTree dmpNavigationTree) {
this.dmpNavigationTreeDao.update(dmpNavigationTree);
return this.queryById(dmpNavigationTree.getId());
}
/**
* 通过主键删除数据
*
* @param id 主键
* @return 是否成功
*/
@Override
public boolean deleteById(Integer id) {
return this.dmpNavigationTreeDao.deleteById(id) > 0;
}
}
\ No newline at end of file
......@@ -257,6 +257,11 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
}
}
@Override
public List<DmpSyncingDatasource> findListByParams(DmpSyncingDatasource ds) throws Exception {
return dmpSyncingDatasourceDao.findListByParams(ds);
}
private DmpAgentDatasourceInfo dsInfoDTO(DmpSyncingDatasourceReq body) throws Exception {
//数据源类型ID去查询
DmpSyncingDatasourceType type = dmpSyncingDatasourceDao.queryDatasourceTypeById(body.getDatasourceType());
......
......@@ -13,23 +13,26 @@ import com.jz.common.utils.AzkabanApiUtils2;
import com.jz.common.utils.FileUtils;
import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.ZipUtils;
import com.jz.common.utils.web.XmlUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.controller.DataIntegration.bean.*;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution;
import com.jz.dmp.modules.dao.*;
import com.jz.dmp.modules.model.*;
import com.jz.dmp.modules.service.DmpDevelopTaskService;
import com.jz.dmp.modules.service.OfflineSynchService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.*;
......@@ -73,6 +76,12 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired
private DvRuleTDao dvRuleTDao;
@Autowired
private DmpNavigationTreeDao dmpNavigationTreeDao;
@Autowired
private DmpDevelopTaskService dmpDevelopTaskService;
@Override
public PageInfoResponse<TaskListPageDto> queryTaskListPage(TaskListPageReq taskListPageReq) throws Exception {
PageInfoResponse<TaskListPageDto> pageInfoResponse = new PageInfoResponse<>();
......@@ -400,8 +409,8 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
DvRuleT dvRuleT = new DvRuleT();
List<DvRuleTDto> list = dvRuleTDao.queryJyRuleListPage(dvRuleT);
if (list.size() > 0 && list != null) {
for(DvRuleTDto dto : list){
dto.setRuleContentJson(JSON.parseObject(dto.getRuleContent()));
for (DvRuleTDto dto : list) {
dto.setRuleContentJson(JSON.parseObject(dto.getRuleContent()));
}
}
PageInfo<DvRuleTDto> pageInfo = new PageInfo<>(list);
......@@ -412,9 +421,102 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
return pageInfoResponse;
}
/**
* 添加保存dmp数据(包含校验数据)
*
* @param syncDmpTaskAddReq
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public JsonResult addSyncTask(SyncDmpTaskAddReq syncDmpTaskAddReq) throws Exception {
return null;
List<Map<String, Object>> result = new ArrayList<>();
Map<String, Object> reqParam = syncDmpTaskAddReq.getParams();
if (reqParam.size() > 0 && reqParam != null) {
JsonResult jsonResult = addSyncing(reqParam);
DmpDevelopTask data = (DmpDevelopTask) jsonResult.getData();
//saveRuleInfo(result, reqParam, jsonResult, String.valueOf(data.getId()));
}
return new JsonResult(ResultCode.SUCCESS, result);
}
/**
* 添加保存离线任务dmp数据
*
* @param body
* @return
*/
public JsonResult addSyncing(Map<String, Object> body) throws Exception {
Integer projectId = (Integer) body.get("projectId");
Integer parentId = (Integer) body.get("parentId"); //父节点ID
String taskName = (String) body.get("taskName"); //任务名称 业务节点名称 一对一
Integer treeId = (Integer) body.get("treeId"); //树节点ID
if (StringUtils.isBlank(taskName)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "任务名称不能为空");
}
Map<String, Object> scriptMap = (Map<String, Object>) body.get("scripts"); //任务json数据
Object content = scriptMap.get("content");
String xmlTdb = XmlUtils.getPropertyValue(content, "target_db_connection");
if (null != content && xmlTdb.length() == 0) { // 包含content但未取出值条件才成立
return new JsonResult(ResultCode.PARAMS_ERROR, "脚本内容中缺失目标数据源(target_db_connection)");
}
/*DmpNavigationTree tree = new DmpNavigationTree();
tree.setName(taskName); //树节点名称
tree.setProjectId(projectId);
tree.setCategory("2"); //树类别
tree.setIsLevel("1"); //是否叶子节点 1 是
int cnt = dmpNavigationTreeDao.countTreeByName(tree);
if (cnt > 0) {
throw new RuntimeException("当前项目已存在同名的任务");
}*/
//保存目标库类型
Integer dataSourceId = null; //数据源ID
Map<String, Object> writer = (Map<String, Object>) scriptMap.get("writer"); // 目标数据
String targetDb = (String) writer.get("targetDbConnection"); // 目标库名称
String targetTable = (String) writer.get("targetTable"); // 目标库表名称
Map<String, Object> reader = (Map<String, Object>) scriptMap.get("reader");//源数据
String sourceDbName = (String) reader.get("dbConnection"); // 源库名称
String sourceTableName = (String) reader.get("registerTableName"); // 源库表名称
if (StringUtils.isNotBlank(targetDb)) {
//根据 目标库和项目id 查询
dataSourceId = dmpDevelopTaskDao.getDbInfoByParam(targetDb, projectId);
}
DmpDevelopTask task = new DmpDevelopTask();
task.setProjectId(projectId);
task.setParentId(parentId);
task.setTaskType("2"); //任务类型
task.setDatasourceId(dataSourceId); //数据源ID
task.setType("3");
//task.setTaskName(taskName);
task.setTaskDesc("Syncing Task"); //任务描述
task.setIsSubmit("0"); //是否已提交
task.setTreeId(treeId);
String script = JsonMapper.toJsonString(body);
byte[] data = null;
try {
data = script.getBytes("utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
task.setData(data); //json 数据
task.setTargetDbName(targetDb);
task.setTargetTableName(targetTable);
task.setSourceTableName(sourceTableName);
task.setSourceDbName(sourceDbName);
dmpDevelopTaskDao.insert(task); //保存任务数据
//保存时提交XML
/*DmpTaskSchedule schedule = new DmpTaskSchedule();
schedule.setTreeId(task.getTreeId());
schedule.setProjectId(projectId);*/
dmpDevelopTaskService.submitSyncing(task);
return new JsonResult(ResultCode.SUCCESS, task);
}
}
......@@ -21,4 +21,99 @@
delete from dmp_navigation_tree where id = #{treeId}
</delete>
<select id="getDbType" resultType="java.lang.Integer">
SELECT
dsd.id AS id
FROM dmp_syncing_datasource dsd
JOIN dmp_syncing_datasource_type dsdt ON dsd.DATASOURCE_TYPE =dsdt.id
WHERE dsd.DATASOURCE_NAME =#{xmlTdb}
AND dsd.PROJECT_ID=#{projectId}
</select>
<!--新增所有列-->
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_develop_task(datasource_id, TASK_TYPE, TYPE, SCHEDULE_TYPE, IS_SUBMIT, TASK_DESC, SCRIPT, DATA_STATUS, CREATE_USER_ID, CREATE_TIME,
UPDATE_USER_ID, UPDATE_TIME, TREE_ID, CHK_RESULT, SYNC_RESULT, CHK_TIME, SYNC_TIME, FLOW_HEADER, FLOW_JSON, VERSION, IS_GZIPED,SOURCE_DB_NAME,SOURCE_TABLE_NAME,TARGET_DB_NAME,TARGET_TABLE_NAME)
values (#{datasourceId}, #{taskType}, #{type}, #{scheduleType}, #{isSubmit}, #{taskDesc}, #{data}, #{dataStatus}, #{createUserId}, #{createTime}, #{updateUserId},
#{updateTime}, #{treeId}, #{chkResult}, #{syncResult}, #{chkTime}, #{syncTime}, #{flowHeader}, #{flowJson}, #{version}, #{isGziped}, #{sourceDbName}, #{sourceTableName}, #{targetDbName}, #{targetTableName})
</insert>
<update id="update">
update dmp_develop_task
<set>
<if test="datasourceId != null">
datasource_id = #{datasourceId},
</if>
<if test="taskType != null and taskType != ''">
TASK_TYPE = #{taskType},
</if>
<if test="type != null and type != ''">
TYPE = #{type},
</if>
<if test="scheduleType != null and scheduleType != ''">
SCHEDULE_TYPE = #{scheduleType},
</if>
<if test="isSubmit != null and isSubmit != ''">
IS_SUBMIT = #{isSubmit},
</if>
<if test="taskDesc != null and taskDesc != ''">
TASK_DESC = #{taskDesc},
</if>
<if test="script != null">
SCRIPT = #{script},
</if>
<if test="dataStatus != null and dataStatus != ''">
DATA_STATUS = #{dataStatus},
</if>
<if test="createUserId != null and createUserId != ''">
CREATE_USER_ID = #{createUserId},
</if>
<if test="createTime != null">
CREATE_TIME = #{createTime},
</if>
<if test="updateUserId != null and updateUserId != ''">
UPDATE_USER_ID = #{updateUserId},
</if>
<if test="updateTime != null">
UPDATE_TIME = #{updateTime},
</if>
<if test="treeId != null">
TREE_ID = #{treeId},
</if>
<if test="chkResult != null and chkResult != ''">
CHK_RESULT = #{chkResult},
</if>
<if test="syncResult != null and syncResult != ''">
SYNC_RESULT = #{syncResult},
</if>
<if test="chkTime != null">
CHK_TIME = #{chkTime},
</if>
<if test="syncTime != null">
SYNC_TIME = #{syncTime},
</if>
<if test="flowHeader != null and flowHeader != ''">
FLOW_HEADER = #{flowHeader},
</if>
<if test="flowJson != null">
FLOW_JSON = #{flowJson},
</if>
<if test="version != null and version != ''">
VERSION = #{version},
</if>
<if test="isGziped != null">
IS_GZIPED = #{isGziped},
</if>
</set>
where ID = #{id}
</update>
<select id="selectTaskInfoByParam" parameterType="map">
select
ID, datasource_id, TASK_TYPE, TYPE, SCHEDULE_TYPE, IS_SUBMIT, TASK_DESC, SCRIPT, DATA_STATUS
, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TREE_ID, CHK_RESULT, SYNC_RESULT, CHK_TIME, SYNC_TIME, FLOW_HEADER, FLOW_JSON, VERSION, IS_GZIPED
from dmp_develop_task
where TREE_ID = #{treeId}
</select>
</mapper>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jz.dmp.modules.dao.DmpNavigationTreeDao">
<resultMap type="com.jz.dmp.modules.model.DmpNavigationTree" id="DmpNavigationTreeMap">
<result property="id" column="ID" jdbcType="INTEGER"/>
<result property="category" column="CATEGORY" jdbcType="VARCHAR"/>
<result property="type" column="TYPE" jdbcType="VARCHAR"/>
<result property="name" column="NAME" jdbcType="VARCHAR"/>
<result property="treeSort" column="TREE_SORT" jdbcType="INTEGER"/>
<result property="icon" column="ICON" jdbcType="VARCHAR"/>
<result property="isLevel" column="IS_LEVEL" jdbcType="VARCHAR"/>
<result property="isEnable" column="IS_ENABLE" jdbcType="VARCHAR"/>
<result property="lockByUser" column="LOCK_BY_USER" jdbcType="VARCHAR"/>
<result property="lockTime" column="LOCK_TIME" jdbcType="TIMESTAMP"/>
<result property="dataStatus" column="DATA_STATUS" jdbcType="VARCHAR"/>
<result property="createUserId" column="CREATE_USER_ID" jdbcType="VARCHAR"/>
<result property="createTime" column="CREATE_TIME" jdbcType="TIMESTAMP"/>
<result property="updateUserId" column="UPDATE_USER_ID" jdbcType="VARCHAR"/>
<result property="updateTime" column="UPDATE_TIME" jdbcType="TIMESTAMP"/>
<result property="projectId" column="PROJECT_ID" jdbcType="INTEGER"/>
<result property="parentId" column="PARENT_ID" jdbcType="INTEGER"/>
</resultMap>
<!--查询单个-->
<select id="queryById" resultMap="DmpNavigationTreeMap">
select
ID, CATEGORY, TYPE, NAME, TREE_SORT, ICON, IS_LEVEL, IS_ENABLE, LOCK_BY_USER, LOCK_TIME, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, PROJECT_ID, PARENT_ID
from dmp_navigation_tree
where ID = #{id}
</select>
<!--查询指定行数据-->
<select id="queryAllByLimit" resultMap="DmpNavigationTreeMap">
select
ID, CATEGORY, TYPE, NAME, TREE_SORT, ICON, IS_LEVEL, IS_ENABLE, LOCK_BY_USER, LOCK_TIME, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, PROJECT_ID, PARENT_ID
from dmp_navigation_tree
limit #{offset}, #{limit}
</select>
<!--通过实体作为筛选条件查询-->
<select id="queryAll" resultMap="DmpNavigationTreeMap">
select
ID, CATEGORY, TYPE, NAME, TREE_SORT, ICON, IS_LEVEL, IS_ENABLE, LOCK_BY_USER, LOCK_TIME, DATA_STATUS,
CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, PROJECT_ID, PARENT_ID
from dmp_navigation_tree
<where>
<if test="id != null">
and ID = #{id}
</if>
<if test="category != null and category != ''">
and CATEGORY = #{category}
</if>
<if test="type != null and type != ''">
and TYPE = #{type}
</if>
<if test="name != null and name != ''">
and NAME = #{name}
</if>
<if test="treeSort != null">
and TREE_SORT = #{treeSort}
</if>
<if test="icon != null and icon != ''">
and ICON = #{icon}
</if>
<if test="isLevel != null and isLevel != ''">
and IS_LEVEL = #{isLevel}
</if>
<if test="isEnable != null and isEnable != ''">
and IS_ENABLE = #{isEnable}
</if>
<if test="lockByUser != null and lockByUser != ''">
and LOCK_BY_USER = #{lockByUser}
</if>
<if test="lockTime != null">
and LOCK_TIME = #{lockTime}
</if>
<if test="dataStatus != null and dataStatus != ''">
and DATA_STATUS = #{dataStatus}
</if>
<if test="createUserId != null and createUserId != ''">
and CREATE_USER_ID = #{createUserId}
</if>
<if test="createTime != null">
and CREATE_TIME = #{createTime}
</if>
<if test="updateUserId != null and updateUserId != ''">
and UPDATE_USER_ID = #{updateUserId}
</if>
<if test="updateTime != null">
and UPDATE_TIME = #{updateTime}
</if>
<if test="projectId != null">
and PROJECT_ID = #{projectId}
</if>
<if test="parentId != null">
and PARENT_ID = #{parentId}
</if>
</where>
</select>
<!--新增所有列-->
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_navigation_tree(CATEGORY, TYPE, NAME, TREE_SORT, ICON, IS_LEVEL, IS_ENABLE, LOCK_BY_USER, LOCK_TIME, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, PROJECT_ID, PARENT_ID)
values (#{category}, #{type}, #{name}, #{treeSort}, #{icon}, #{isLevel}, #{isEnable}, #{lockByUser}, #{lockTime}, #{dataStatus}, #{createUserId}, #{createTime}, #{updateUserId}, #{updateTime}, #{projectId}, #{parentId})
</insert>
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_navigation_tree(CATEGORY, TYPE, NAME, TREE_SORT, ICON, IS_LEVEL, IS_ENABLE,
LOCK_BY_USER, LOCK_TIME, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, PROJECT_ID,
PARENT_ID)
values
<foreach collection="entities" item="entity" separator=",">
(#{entity.category}, #{entity.type}, #{entity.name}, #{entity.treeSort}, #{entity.icon}, #{entity.isLevel},
#{entity.isEnable}, #{entity.lockByUser}, #{entity.lockTime}, #{entity.dataStatus}, #{entity.createUserId},
#{entity.createTime}, #{entity.updateUserId}, #{entity.updateTime}, #{entity.projectId}, #{entity.parentId})
</foreach>
</insert>
<insert id="insertOrUpdateBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_navigation_tree(CATEGORY, TYPE, NAME, TREE_SORT, ICON, IS_LEVEL, IS_ENABLE,
LOCK_BY_USER, LOCK_TIME, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, PROJECT_ID,
PARENT_ID)
values
<foreach collection="entities" item="entity" separator=",">
(#{entity.category}, #{entity.type}, #{entity.name}, #{entity.treeSort}, #{entity.icon}, #{entity.isLevel},
#{entity.isEnable}, #{entity.lockByUser}, #{entity.lockTime}, #{entity.dataStatus}, #{entity.createUserId},
#{entity.createTime}, #{entity.updateUserId}, #{entity.updateTime}, #{entity.projectId}, #{entity.parentId})
</foreach>
on duplicate key update
CATEGORY = values(CATEGORY) , TYPE = values(TYPE) , NAME = values(NAME) , TREE_SORT = values(TREE_SORT) , ICON =
values(ICON) , IS_LEVEL = values(IS_LEVEL) , IS_ENABLE = values(IS_ENABLE) , LOCK_BY_USER = values(LOCK_BY_USER)
, LOCK_TIME = values(LOCK_TIME) , DATA_STATUS = values(DATA_STATUS) , CREATE_USER_ID = values(CREATE_USER_ID) ,
CREATE_TIME = values(CREATE_TIME) , UPDATE_USER_ID = values(UPDATE_USER_ID) , UPDATE_TIME = values(UPDATE_TIME)
, PROJECT_ID = values(PROJECT_ID) , PARENT_ID = values(PARENT_ID)
</insert>
<!--通过主键修改数据-->
<update id="update">
update dmp_navigation_tree
<set>
<if test="category != null and category != ''">
CATEGORY = #{category},
</if>
<if test="type != null and type != ''">
TYPE = #{type},
</if>
<if test="name != null and name != ''">
NAME = #{name},
</if>
<if test="treeSort != null">
TREE_SORT = #{treeSort},
</if>
<if test="icon != null and icon != ''">
ICON = #{icon},
</if>
<if test="isLevel != null and isLevel != ''">
IS_LEVEL = #{isLevel},
</if>
<if test="isEnable != null and isEnable != ''">
IS_ENABLE = #{isEnable},
</if>
<if test="lockByUser != null and lockByUser != ''">
LOCK_BY_USER = #{lockByUser},
</if>
<if test="lockTime != null">
LOCK_TIME = #{lockTime},
</if>
<if test="dataStatus != null and dataStatus != ''">
DATA_STATUS = #{dataStatus},
</if>
<if test="createUserId != null and createUserId != ''">
CREATE_USER_ID = #{createUserId},
</if>
<if test="createTime != null">
CREATE_TIME = #{createTime},
</if>
<if test="updateUserId != null and updateUserId != ''">
UPDATE_USER_ID = #{updateUserId},
</if>
<if test="updateTime != null">
UPDATE_TIME = #{updateTime},
</if>
<if test="projectId != null">
PROJECT_ID = #{projectId},
</if>
<if test="parentId != null">
PARENT_ID = #{parentId},
</if>
</set>
where ID = #{id}
</update>
<!--通过主键删除-->
<delete id="deleteById">
delete from dmp_navigation_tree where ID = #{id}
</delete>
<select id="countTreeByName" parameterType="com.jz.dmp.modules.model.DmpNavigationTree" resultType="java.lang.Integer">
select
count(1)
from dmp_navigation_tree t
where t.data_status = '1'
<if test="id != null">and t.id != #{id}</if>
<if test="isLevel != null">and t.is_Level = #{isLevel}</if>
and t.name = #{name}
and t.category = #{category}
and t.project_id = #{projectId}
</select>
</mapper>
\ No newline at end of file
......@@ -349,4 +349,26 @@
<if test="projectId != null and projectId !=''"> and a.project_id = #{projectId}</if>
</select>
<select id="findListByParams" parameterType="com.jz.dmp.modules.model.DmpSyncingDatasource" resultType="com.jz.dmp.modules.model.DmpSyncingDatasource">
SELECT
id,datasource_type,datasource_name,datasource_desc,jdbc_url,db_name,user_name,password,endpoint,bucket
,access_id,access_key,protocol,host,port,default_fs,table_schema,data_status,create_user_id,create_time,update_user_id,update_time,project_id
FROM dmp_syncing_datasource WHERE 1=1
<if test="datasourceType != null">AND datasource_type = #{datasourceType}</if>
<if test="datasourceName != null">AND datasource_name = #{datasourceName}</if>
<if test="datasourceDesc != null">AND datasource_desc = #{datasourceDesc}</if>
<if test="jdbcUrl != null">AND jdbc_url = #{jdbcUrl}</if>
<if test="dbName != null">AND db_name = #{dbName}</if>
<if test="userName != null">AND user_name = #{userName}</if>
<if test="accessId != null">AND access_id = #{accessId}</if>
<if test="accessKey != null">AND access_key = #{accessKey}</if>
<if test="protocol != null">AND protocol = #{protocol}</if>
<if test="host != null">AND host = #{host}</if>
<if test="port != null">AND port = #{port}</if>
<if test="defaultFs != null">AND default_fs = #{defaultFs}</if>
<if test="tableSchema != null">AND table_schema = #{tableSchema}</if>
<if test="dataStatus != null">AND data_status = #{dataStatus}</if>
<if test="projectId != null">AND project_id = #{projectId}</if>
</select>
</mapper>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jz.dmp.modules.dao.DmpSyncingDatasourceTypeDao">
<resultMap type="com.jz.dmp.modules.model.DmpSyncingDatasourceType" id="DmpSyncingDatasourceTypeMap">
<result property="id" column="ID" jdbcType="INTEGER"/>
<result property="datasource" column="DATASOURCE" jdbcType="VARCHAR"/>
<result property="datasourceCatecode" column="DATASOURCE_CATECODE" jdbcType="VARCHAR"/>
<result property="datasourceCatename" column="DATASOURCE_CATENAME" jdbcType="VARCHAR"/>
<result property="datasourceType" column="DATASOURCE_TYPE" jdbcType="VARCHAR"/>
<result property="imgUrl" column="IMG_URL" jdbcType="VARCHAR"/>
<result property="dataStatus" column="DATA_STATUS" jdbcType="VARCHAR"/>
<result property="isEnabled" column="IS_ENABLED" jdbcType="VARCHAR"/>
<result property="datasourceCatetype" column="DATASOURCE_CATETYPE" jdbcType="VARCHAR"/>
<result property="driverClassName" column="DRIVER_CLASS_NAME" jdbcType="VARCHAR"/>
<result property="isEnableTest" column="IS_ENABLE_TEST" jdbcType="VARCHAR"/>
<result property="defaultSourceScript" column="DEFAULT_SOURCE_SCRIPT" jdbcType="OTHER"/>
<result property="defaultTargetScript" column="DEFAULT_TARGET_SCRIPT" jdbcType="OTHER"/>
<result property="isEnableSource" column="IS_ENABLE_SOURCE" jdbcType="VARCHAR"/>
<result property="isEnableTarget" column="IS_ENABLE_TARGET" jdbcType="VARCHAR"/>
</resultMap>
<!--查询单个-->
<select id="queryById" resultMap="DmpSyncingDatasourceTypeMap">
select
ID, DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET
from dmp_web.dmp_syncing_datasource_type
where ID = #{id}
</select>
<!--查询指定行数据-->
<select id="queryAllByLimit" resultMap="DmpSyncingDatasourceTypeMap">
select
ID, DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET
from dmp_web.dmp_syncing_datasource_type
limit #{offset}, #{limit}
</select>
<!--通过实体作为筛选条件查询-->
<select id="queryAll" resultMap="DmpSyncingDatasourceTypeMap">
select
ID, DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED,
DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT,
IS_ENABLE_SOURCE, IS_ENABLE_TARGET
from dmp_web.dmp_syncing_datasource_type
<where>
<if test="id != null">
and ID = #{id}
</if>
<if test="datasource != null and datasource != ''">
and DATASOURCE = #{datasource}
</if>
<if test="datasourceCatecode != null and datasourceCatecode != ''">
and DATASOURCE_CATECODE = #{datasourceCatecode}
</if>
<if test="datasourceCatename != null and datasourceCatename != ''">
and DATASOURCE_CATENAME = #{datasourceCatename}
</if>
<if test="datasourceType != null and datasourceType != ''">
and DATASOURCE_TYPE = #{datasourceType}
</if>
<if test="imgUrl != null and imgUrl != ''">
and IMG_URL = #{imgUrl}
</if>
<if test="dataStatus != null and dataStatus != ''">
and DATA_STATUS = #{dataStatus}
</if>
<if test="isEnabled != null and isEnabled != ''">
and IS_ENABLED = #{isEnabled}
</if>
<if test="datasourceCatetype != null and datasourceCatetype != ''">
and DATASOURCE_CATETYPE = #{datasourceCatetype}
</if>
<if test="driverClassName != null and driverClassName != ''">
and DRIVER_CLASS_NAME = #{driverClassName}
</if>
<if test="isEnableTest != null and isEnableTest != ''">
and IS_ENABLE_TEST = #{isEnableTest}
</if>
<if test="defaultSourceScript != null">
and DEFAULT_SOURCE_SCRIPT = #{defaultSourceScript}
</if>
<if test="defaultTargetScript != null">
and DEFAULT_TARGET_SCRIPT = #{defaultTargetScript}
</if>
<if test="isEnableSource != null and isEnableSource != ''">
and IS_ENABLE_SOURCE = #{isEnableSource}
</if>
<if test="isEnableTarget != null and isEnableTarget != ''">
and IS_ENABLE_TARGET = #{isEnableTarget}
</if>
</where>
</select>
<!--新增所有列-->
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME, DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST, DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET)
values (#{datasource}, #{datasourceCatecode}, #{datasourceCatename}, #{datasourceType}, #{imgUrl}, #{dataStatus}, #{isEnabled}, #{datasourceCatetype}, #{driverClassName}, #{isEnableTest}, #{defaultSourceScript}, #{defaultTargetScript}, #{isEnableSource}, #{isEnableTarget})
</insert>
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME,
DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST,
DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET)
values
<foreach collection="entities" item="entity" separator=",">
(#{entity.datasource}, #{entity.datasourceCatecode}, #{entity.datasourceCatename}, #{entity.datasourceType},
#{entity.imgUrl}, #{entity.dataStatus}, #{entity.isEnabled}, #{entity.datasourceCatetype},
#{entity.driverClassName}, #{entity.isEnableTest}, #{entity.defaultSourceScript},
#{entity.defaultTargetScript}, #{entity.isEnableSource}, #{entity.isEnableTarget})
</foreach>
</insert>
<insert id="insertOrUpdateBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_syncing_datasource_type(DATASOURCE, DATASOURCE_CATECODE, DATASOURCE_CATENAME,
DATASOURCE_TYPE, IMG_URL, DATA_STATUS, IS_ENABLED, DATASOURCE_CATETYPE, DRIVER_CLASS_NAME, IS_ENABLE_TEST,
DEFAULT_SOURCE_SCRIPT, DEFAULT_TARGET_SCRIPT, IS_ENABLE_SOURCE, IS_ENABLE_TARGET)
values
<foreach collection="entities" item="entity" separator=",">
(#{entity.datasource}, #{entity.datasourceCatecode}, #{entity.datasourceCatename}, #{entity.datasourceType},
#{entity.imgUrl}, #{entity.dataStatus}, #{entity.isEnabled}, #{entity.datasourceCatetype},
#{entity.driverClassName}, #{entity.isEnableTest}, #{entity.defaultSourceScript},
#{entity.defaultTargetScript}, #{entity.isEnableSource}, #{entity.isEnableTarget})
</foreach>
on duplicate key update
DATASOURCE = values(DATASOURCE) , DATASOURCE_CATECODE = values(DATASOURCE_CATECODE) , DATASOURCE_CATENAME =
values(DATASOURCE_CATENAME) , DATASOURCE_TYPE = values(DATASOURCE_TYPE) , IMG_URL = values(IMG_URL) ,
DATA_STATUS = values(DATA_STATUS) , IS_ENABLED = values(IS_ENABLED) , DATASOURCE_CATETYPE =
values(DATASOURCE_CATETYPE) , DRIVER_CLASS_NAME = values(DRIVER_CLASS_NAME) , IS_ENABLE_TEST =
values(IS_ENABLE_TEST) , DEFAULT_SOURCE_SCRIPT = values(DEFAULT_SOURCE_SCRIPT) , DEFAULT_TARGET_SCRIPT =
values(DEFAULT_TARGET_SCRIPT) , IS_ENABLE_SOURCE = values(IS_ENABLE_SOURCE) , IS_ENABLE_TARGET =
values(IS_ENABLE_TARGET)
</insert>
<!--通过主键修改数据-->
<update id="update">
update dmp_web.dmp_syncing_datasource_type
<set>
<if test="datasource != null and datasource != ''">
DATASOURCE = #{datasource},
</if>
<if test="datasourceCatecode != null and datasourceCatecode != ''">
DATASOURCE_CATECODE = #{datasourceCatecode},
</if>
<if test="datasourceCatename != null and datasourceCatename != ''">
DATASOURCE_CATENAME = #{datasourceCatename},
</if>
<if test="datasourceType != null and datasourceType != ''">
DATASOURCE_TYPE = #{datasourceType},
</if>
<if test="imgUrl != null and imgUrl != ''">
IMG_URL = #{imgUrl},
</if>
<if test="dataStatus != null and dataStatus != ''">
DATA_STATUS = #{dataStatus},
</if>
<if test="isEnabled != null and isEnabled != ''">
IS_ENABLED = #{isEnabled},
</if>
<if test="datasourceCatetype != null and datasourceCatetype != ''">
DATASOURCE_CATETYPE = #{datasourceCatetype},
</if>
<if test="driverClassName != null and driverClassName != ''">
DRIVER_CLASS_NAME = #{driverClassName},
</if>
<if test="isEnableTest != null and isEnableTest != ''">
IS_ENABLE_TEST = #{isEnableTest},
</if>
<if test="defaultSourceScript != null">
DEFAULT_SOURCE_SCRIPT = #{defaultSourceScript},
</if>
<if test="defaultTargetScript != null">
DEFAULT_TARGET_SCRIPT = #{defaultTargetScript},
</if>
<if test="isEnableSource != null and isEnableSource != ''">
IS_ENABLE_SOURCE = #{isEnableSource},
</if>
<if test="isEnableTarget != null and isEnableTarget != ''">
IS_ENABLE_TARGET = #{isEnableTarget},
</if>
</set>
where ID = #{id}
</update>
<!--通过主键删除-->
<delete id="deleteById">
delete from dmp_web.dmp_syncing_datasource_type where ID = #{id}
</delete>
</mapper>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment