Commit 64ed596f authored by mcb's avatar mcb

离线任务保存

parent 2504c4aa
package com.jz.dmp.modules.dao;
import com.jz.dmp.modules.model.DvTaskRuleT;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* 任务与规则关系表(DvTaskRuleT)表数据库访问层
*
* @author makejava
* @since 2020-12-30 15:17:28
*/
public interface DvTaskRuleTDao {
/**
* 通过ID查询单条数据
*
* @param taskRuleId 主键
* @return 实例对象
*/
DvTaskRuleT queryById(Long taskRuleId);
/**
* 查询指定行数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
List<DvTaskRuleT> queryAllByLimit(@Param("offset") int offset, @Param("limit") int limit);
/**
* 通过实体作为筛选条件查询
*
* @param dvTaskRuleT 实例对象
* @return 对象列表
*/
List<DvTaskRuleT> queryAll(DvTaskRuleT dvTaskRuleT);
/**
* 新增数据
*
* @param dvTaskRuleT 实例对象
* @return 影响行数
*/
int insert(DvTaskRuleT dvTaskRuleT);
/**
* 批量新增数据(MyBatis原生foreach方法)
*
* @param list List<DvTaskRuleT> 实例对象列表
* @return 影响行数
*/
int insertBatch(List<DvTaskRuleT> list);
/**
* 批量新增或按主键更新数据(MyBatis原生foreach方法)
*
* @param entities List<DvTaskRuleT> 实例对象列表
* @return 影响行数
*/
int insertOrUpdateBatch(@Param("entities") List<DvTaskRuleT> entities);
/**
* 修改数据
*
* @param dvTaskRuleT 实例对象
* @return 影响行数
*/
int update(DvTaskRuleT dvTaskRuleT);
/**
* 通过主键删除数据
*
* @param taskRuleId 主键
* @return 影响行数
*/
int deleteById(Long taskRuleId);
}
\ No newline at end of file
package com.jz.dmp.modules.model;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.Date;
/**
* 任务与规则关系表(DvTaskRuleT)实体类
*
* @author makejava
* @since 2020-12-30 15:17:26
*/
@ApiModel(value = "任务与规则关系表", description = "任务与规则关系表")
public class DvTaskRuleT implements Serializable {
private static final long serialVersionUID = -23295925034569493L;
/**
* 任务与规则ID
*/
@ApiModelProperty(value = "任务与规则ID")
private Long taskRuleId;
/**
* 任务ID
*/
@ApiModelProperty(value = "任务ID")
private String taskId;
/**
* 规则ID
*/
@ApiModelProperty(value = "规则ID")
private Long ruleId;
/**
* content(json)
*/
@ApiModelProperty(value = "content(json)")
private String ruleValue;
/**
* 是否删除状态 0-删除 1-未删除
*/
@ApiModelProperty(value = "是否删除状态 0-删除 1-未删除")
private Object delFlag;
/**
* 创建时间
*/
@ApiModelProperty(value = "创建时间")
private Date createTime;
/**
* 创建人
*/
@ApiModelProperty(value = "创建人")
private String createBy;
/**
* 最后更新人
*/
@ApiModelProperty(value = "最后更新人")
private String updateBy;
/**
* 更新时间
*/
@ApiModelProperty(value = "更新时间")
private Date updateTime;
public Long getTaskRuleId() {
return taskRuleId;
}
public void setTaskRuleId(Long taskRuleId) {
this.taskRuleId = taskRuleId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public Long getRuleId() {
return ruleId;
}
public void setRuleId(Long ruleId) {
this.ruleId = ruleId;
}
public String getRuleValue() {
return ruleValue;
}
public void setRuleValue(String ruleValue) {
this.ruleValue = ruleValue;
}
public Object getDelFlag() {
return delFlag;
}
public void setDelFlag(Object delFlag) {
this.delFlag = delFlag;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public String getCreateBy() {
return createBy;
}
public void setCreateBy(String createBy) {
this.createBy = createBy;
}
public String getUpdateBy() {
return updateBy;
}
public void setUpdateBy(String updateBy) {
this.updateBy = updateBy;
}
public Date getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}
}
\ No newline at end of file
package com.jz.dmp.modules.service;
import com.jz.dmp.modules.model.DvTaskRuleT;
import java.util.List;
/**
* 任务与规则关系表(DvTaskRuleT)表服务接口
*
* @author Bellamy
* @since 2020-12-30 15:17:32
*/
public interface DvTaskRuleTService {
/**
* 通过ID查询单条数据
*
* @param taskRuleId 主键
* @return 实例对象
*/
DvTaskRuleT queryById(Long taskRuleId);
/**
* 查询多条数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
List<DvTaskRuleT> queryAllByLimit(int offset, int limit);
/**
* 新增数据
*
* @param dvTaskRuleT 实例对象
* @return 实例对象
*/
DvTaskRuleT insert(DvTaskRuleT dvTaskRuleT);
/**
* 修改数据
*
* @param dvTaskRuleT 实例对象
* @return 实例对象
*/
DvTaskRuleT update(DvTaskRuleT dvTaskRuleT);
/**
* 通过主键删除数据
*
* @param taskRuleId 主键
* @return 是否成功
*/
boolean deleteById(Long taskRuleId);
void saveRule(List<DvTaskRuleT> list)throws Exception;
}
\ No newline at end of file
...@@ -106,7 +106,9 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService { ...@@ -106,7 +106,9 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
return new JsonResult(ResultCode.SUCCESS, rst); return new JsonResult(ResultCode.SUCCESS, rst);
} }
} }
/*
* 生成xml
* */
public String convert2SyncXmlContent(DmpDevelopTask body) throws Exception { public String convert2SyncXmlContent(DmpDevelopTask body) throws Exception {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
long treeId = body.getTreeId(); long treeId = body.getTreeId();
...@@ -137,7 +139,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService { ...@@ -137,7 +139,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
//******源数据****** //******源数据******
Map<String, Object> readerMap = (Map<String, Object>) scriptMap.get("reader"); Map<String, Object> readerMap = (Map<String, Object>) scriptMap.get("reader");
String _dbConnection = (String) readerMap.get("dbConnection"); //源数据库名称 String _dbConnection = (String) readerMap.get("dbConnection"); //源数据库名称
String _fileType = (String) readerMap.get("fileType"); String _fileType = (String) readerMap.get("fileType"); //文件类型
String _sourceHdfsPath = (String) readerMap.get("sourceHdfsPath"); String _sourceHdfsPath = (String) readerMap.get("sourceHdfsPath");
String _sourceHdfsFile = (String) readerMap.get("sourceHdfsFile"); String _sourceHdfsFile = (String) readerMap.get("sourceHdfsFile");
String _sourceFtpDir = (String) readerMap.get("sourceFtpDir"); String _sourceFtpDir = (String) readerMap.get("sourceFtpDir");
...@@ -185,6 +187,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService { ...@@ -185,6 +187,7 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
String _sourceDbName_ = this.getTargetDBname(sDS); String _sourceDbName_ = this.getTargetDBname(sDS);
String st = sDST.getDatasourceType(); String st = sDST.getDatasourceType();
String tt = tDST.getDatasourceType(); String tt = tDST.getDatasourceType();
//获取文件类型
String ft = this.getFileType(_fileType, _sourceHdfsFile, _sourceFtpFile); String ft = this.getFileType(_fileType, _sourceHdfsFile, _sourceFtpFile);
boolean isBigData2jdbc = ("HIVE".equals(st) || "KUDU".equals(st)) && "RDBMS".equals(tDST.getDatasourceCatecode()); boolean isBigData2jdbc = ("HIVE".equals(st) || "KUDU".equals(st)) && "RDBMS".equals(tDST.getDatasourceCatecode());
...@@ -312,6 +315,9 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService { ...@@ -312,6 +315,9 @@ public class DmpDevelopTaskServiceImpl implements DmpDevelopTaskService {
} }
} }
/*
* 获取文件类型
* */
private String getFileType(String ft, String _sourceHdfsFile, String _sourceFtpFile) { private String getFileType(String ft, String _sourceHdfsFile, String _sourceFtpFile) {
if(StringUtils.isBlank(ft)) { if(StringUtils.isBlank(ft)) {
if(StringUtils.isNotBlank(_sourceHdfsFile)) { if(StringUtils.isNotBlank(_sourceHdfsFile)) {
......
package com.jz.dmp.modules.service.impl;
import com.jz.dmp.modules.dao.DvTaskRuleTDao;
import com.jz.dmp.modules.model.DvTaskRuleT;
import com.jz.dmp.modules.service.DvTaskRuleTService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
/**
* 任务与规则关系表(DvTaskRuleT)表服务实现类
*
* @author Bellamy
* @since 2020-12-30 15:17:35
*/
@Service("dvTaskRuleTService")
public class DvTaskRuleTServiceImpl implements DvTaskRuleTService {
@Autowired
private DvTaskRuleTDao dvTaskRuleTDao;
/**
* 通过ID查询单条数据
*
* @param taskRuleId 主键
* @return 实例对象
*/
@Override
public DvTaskRuleT queryById(Long taskRuleId) {
return this.dvTaskRuleTDao.queryById(taskRuleId);
}
/**
* 查询多条数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
@Override
public List<DvTaskRuleT> queryAllByLimit(int offset, int limit) {
return this.dvTaskRuleTDao.queryAllByLimit(offset, limit);
}
/**
* 新增数据
*
* @param dvTaskRuleT 实例对象
* @return 实例对象
*/
@Override
public DvTaskRuleT insert(DvTaskRuleT dvTaskRuleT) {
this.dvTaskRuleTDao.insert(dvTaskRuleT);
return dvTaskRuleT;
}
/**
* 修改数据
*
* @param dvTaskRuleT 实例对象
* @return 实例对象
*/
@Override
public DvTaskRuleT update(DvTaskRuleT dvTaskRuleT) {
this.dvTaskRuleTDao.update(dvTaskRuleT);
return this.queryById(dvTaskRuleT.getTaskRuleId());
}
/**
* 通过主键删除数据
*
* @param taskRuleId 主键
* @return 是否成功
*/
@Override
public boolean deleteById(Long taskRuleId) {
return this.dvTaskRuleTDao.deleteById(taskRuleId) > 0;
}
/**
* 保存规则列表信息
* @param list
* @throws Exception
*/
@Override
public void saveRule(List<DvTaskRuleT> list) throws Exception {
dvTaskRuleTDao.insertBatch(list);
}
}
\ No newline at end of file
...@@ -20,7 +20,9 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution; ...@@ -20,7 +20,9 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution;
import com.jz.dmp.modules.dao.*; import com.jz.dmp.modules.dao.*;
import com.jz.dmp.modules.model.*; import com.jz.dmp.modules.model.*;
import com.jz.dmp.modules.service.DmpDevelopTaskService; import com.jz.dmp.modules.service.DmpDevelopTaskService;
import com.jz.dmp.modules.service.DvTaskRuleTService;
import com.jz.dmp.modules.service.OfflineSynchService; import com.jz.dmp.modules.service.OfflineSynchService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -82,6 +84,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -82,6 +84,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired @Autowired
private DmpDevelopTaskService dmpDevelopTaskService; private DmpDevelopTaskService dmpDevelopTaskService;
@Autowired
private DvTaskRuleTService dvTaskRuleTService;
@Override @Override
public PageInfoResponse<TaskListPageDto> queryTaskListPage(TaskListPageReq taskListPageReq) throws Exception { public PageInfoResponse<TaskListPageDto> queryTaskListPage(TaskListPageReq taskListPageReq) throws Exception {
PageInfoResponse<TaskListPageDto> pageInfoResponse = new PageInfoResponse<>(); PageInfoResponse<TaskListPageDto> pageInfoResponse = new PageInfoResponse<>();
...@@ -435,7 +440,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -435,7 +440,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
if (reqParam.size() > 0 && reqParam != null) { if (reqParam.size() > 0 && reqParam != null) {
JsonResult jsonResult = addSyncing(reqParam); JsonResult jsonResult = addSyncing(reqParam);
DmpDevelopTask data = (DmpDevelopTask) jsonResult.getData(); DmpDevelopTask data = (DmpDevelopTask) jsonResult.getData();
//saveRuleInfo(result, reqParam, jsonResult, String.valueOf(data.getId())); saveRuleInfo(result, reqParam, jsonResult, String.valueOf(data.getId()));
} }
return new JsonResult(ResultCode.SUCCESS, result); return new JsonResult(ResultCode.SUCCESS, result);
} }
...@@ -519,4 +524,51 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -519,4 +524,51 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
return new JsonResult(ResultCode.SUCCESS, task); return new JsonResult(ResultCode.SUCCESS, task);
} }
/**
* 保存规则信息
*
* @param result
* @param reqParam
* @param jsonResult
* @param taskId
* @throws Exception
*/
private void saveRuleInfo(List<Map<String, Object>> result, Map<String, Object> reqParam, JsonResult jsonResult, String taskId) throws Exception {
List<DvTaskRuleT> list = new ArrayList<>();
if (jsonResult.getCode().equals(ResultCode.SUCCESS.val())) {
List<Map> rules = (List<Map>) reqParam.get("taskRules"); //规则信息
if (!CollectionUtils.isEmpty(rules)) {
settRuleInfo(taskId, rules, list);
}
} else {
HashMap<String, Object> errorMap = new HashMap<>();
errorMap.put("name", reqParam.get("name"));
errorMap.put("msg", jsonResult.getMessage());
errorMap.put("code", jsonResult.getCode());
result.add(errorMap);
}
}
/**
* 保存dmp数据校验信息
*
* @param taskId
* @param rules
* @param list
* @throws Exception
*/
private void settRuleInfo(String taskId, List<Map> rules, List<DvTaskRuleT> list) throws Exception {
for (Map rule : rules) {
DvTaskRuleT taskRuleT = new DvTaskRuleT();
Integer ruleId = (Integer) rule.get("ruleId");
String ruleValue = (String) rule.get("ruleValue");
taskRuleT.setTaskId(taskId);//任务ID
taskRuleT.setRuleId(ruleId.longValue());
taskRuleT.setRuleValue(ruleValue);
list.add(taskRuleT);
}
//批量新增任务与规则关系表
dvTaskRuleTService.saveRule(list);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment