Commit 7de99fa2 authored by sml's avatar sml

代码合并

parent 5f24ed50
package com.jz.dmp.modules.service.impl; package com.jz.dmp.modules.service.impl;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
...@@ -17,29 +39,40 @@ import com.jz.common.utils.ZipUtils; ...@@ -17,29 +39,40 @@ import com.jz.common.utils.ZipUtils;
import com.jz.common.utils.web.SessionUtils; import com.jz.common.utils.web.SessionUtils;
import com.jz.common.utils.web.XmlUtils; import com.jz.common.utils.web.XmlUtils;
import com.jz.dmp.agent.DmpAgentResult; import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.controller.DataIntegration.bean.*; import com.jz.dmp.modules.controller.DataIntegration.bean.CheckJyRlueStatusDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.CheckJyRlueStatusReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.CheckTaskStatusPageDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.CheckTaskStatusPageReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.DvRuleTDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.NewSynchTaskReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.SourceDbNameListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.SoureAndTargetColumnsReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.SyncDmpTaskAddReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.TaskListPageDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.TaskListPageReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution; import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution;
import com.jz.dmp.modules.controller.dataService.bean.SoureTableColumnsReq; import com.jz.dmp.modules.controller.dataService.bean.SoureTableColumnsReq;
import com.jz.dmp.modules.dao.*; import com.jz.dmp.modules.dao.DmpDevelopTaskDao;
import com.jz.dmp.modules.model.*; import com.jz.dmp.modules.dao.DmpNavigationTreeDao;
import com.jz.dmp.modules.dao.DmpProjectDao;
import com.jz.dmp.modules.dao.DmpTableColumnDao;
import com.jz.dmp.modules.dao.DmpTableDao;
import com.jz.dmp.modules.dao.DmpTableFieldSchemaDao;
import com.jz.dmp.modules.dao.DvRuleTDao;
import com.jz.dmp.modules.dao.OfflineSynchDao;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import com.jz.dmp.modules.model.DmpDevelopTask;
import com.jz.dmp.modules.model.DmpNavigationTree;
import com.jz.dmp.modules.model.DmpProjectSystemInfo;
import com.jz.dmp.modules.model.DmpTable;
import com.jz.dmp.modules.model.DmpTableColumn;
import com.jz.dmp.modules.model.DmpTableFieldSchema;
import com.jz.dmp.modules.model.DvRuleT;
import com.jz.dmp.modules.model.DvTaskRuleT;
import com.jz.dmp.modules.model.SSOUserInfo;
import com.jz.dmp.modules.service.DmpDevelopTaskService; import com.jz.dmp.modules.service.DmpDevelopTaskService;
import com.jz.dmp.modules.service.DvTaskRuleTService; import com.jz.dmp.modules.service.DvTaskRuleTService;
import com.jz.dmp.modules.service.OfflineSynchService; import com.jz.dmp.modules.service.OfflineSynchService;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.*;
/** /**
* @Description:离线同步服务层 * @Description:离线同步服务层
...@@ -89,6 +122,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -89,6 +122,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired @Autowired
private DvTaskRuleTService dvTaskRuleTService; private DvTaskRuleTService dvTaskRuleTService;
@Autowired
private RedisTemplate redisTemplate;
/** /**
* 离线同步任务列表分页查询 * 离线同步任务列表分页查询
...@@ -114,7 +150,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -114,7 +150,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
* 获取源数据库名称——下拉框 * 获取源数据库名称——下拉框
*/ */
@Override @Override
public JsonResult<List<SourceDbNameListDto>> querygSourceDbList(Integer projectId,String datasourceTypeId) throws Exception { public JsonResult<List<SourceDbNameListDto>> querygSourceDbList(Integer projectId, String datasourceTypeId) throws Exception {
Map map = new HashMap(); Map map = new HashMap();
map.put("projectId", projectId); //项目id map.put("projectId", projectId); //项目id
map.put("datasourceTypeId", datasourceTypeId); map.put("datasourceTypeId", datasourceTypeId);
...@@ -232,7 +268,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -232,7 +268,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//上传到azkaban todo //上传到azkaban todo
//上次zip包到azkaban //上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName; String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl); AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName); return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName);
} }
...@@ -278,7 +314,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -278,7 +314,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//调用azkaban服务 //调用azkaban服务
String azkabanApiUrl = publishToProjectSystemInfo.getAzkabanMonitorUrl(); String azkabanApiUrl = publishToProjectSystemInfo.getAzkabanMonitorUrl();
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl); AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl, redisTemplate);
list = azkabanApiUtils.getSyncingFlowExecution(projectId, treeName, checkTaskStatusPageReq.getPageNum(), checkTaskStatusPageReq.getPageSize()); list = azkabanApiUtils.getSyncingFlowExecution(projectId, treeName, checkTaskStatusPageReq.getPageNum(), checkTaskStatusPageReq.getPageSize());
SimpleDateFormat dtf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat dtf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
...@@ -640,11 +676,12 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -640,11 +676,12 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task.setTargetTableName(targetTable); task.setTargetTableName(targetTable);
task.setSourceTableName(sourceTableName); task.setSourceTableName(sourceTableName);
task.setSourceDbName(sourceDbName); task.setSourceDbName(sourceDbName);
task.setCreateUserId(SessionUtils.getCurrentUserId());
task.setCreateTime(new Date());
dmpDevelopTaskDao.insert(task); //保存任务数据 dmpDevelopTaskDao.insert(task); //保存任务数据
logger.info("################################## 新增任务数据结束 ############################################"); logger.info("################################## 新增任务数据结束 ############################################");
//保存时提交XML //保存时提交XML
dmpDevelopTaskService.submitSyncing(task); dmpDevelopTaskService.submitSyncing(task);
return new JsonResult(ResultCode.SUCCESS, task); return new JsonResult(ResultCode.SUCCESS, task);
} }
...@@ -703,36 +740,42 @@ public class OfflineSynchServiceImpl implements OfflineSynchService { ...@@ -703,36 +740,42 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
* @since 2021-01-21 * @since 2021-01-21
*/ */
@Override @Override
public JsonResult querySoureTableColumns(SoureTableColumnsReq req) throws Exception { public JsonResult querySoureTableColumns(Map<String, List<SynchTableColumnsReq>> req) throws Exception {
//通过源数据库id ,查询数据源配置 List<Map> returnData = new ArrayList<>();
DmpAgentDatasourceInfo dsInfo = offlineSynchDao.querySourceDbInfoBySourceId(req.getSourceDbId()); int len = 0;
if (dsInfo == null) { if (req.size() == 0 || req == null) {
throw new RuntimeException("数据源配置信息不存在!"); throw new RuntimeException("请求参数不能为空!");
}
dsInfo.setFileType(req.getFileType());
dsInfo.setDelimiter(req.getCsvDelimiter());
dsInfo.setIsHaveHeader(req.getCsvIsHaveHeader());
//解码源数据库密码
if (StringUtils.isNotBlank(dsInfo.getPassword())) {
dsInfo.setPassword(new BaseService().decode(dsInfo.getPassword(), publicKey));
} }
//创建jdbc,获取数据源表字段 for (SynchTableColumnsReq str : req.get("params")) {
DmpAgentResult rst = dmpDsAgentServiceImp.getTableColumnList(dsInfo, req.getTargetTableName()); //通过源数据库id ,查询数据源配置
if (!rst.getCode().val().equals("200")) { DmpAgentDatasourceInfo dsInfos = offlineSynchDao.querySourceDbInfoBySourceId(str.getSourceDbId());
return new JsonResult(rst.getCode(), rst.getMessage()); DmpAgentDatasourceInfo dsInfo = new DmpAgentDatasourceInfo();
} else { BeanUtils.copyProperties(dsInfos, dsInfo);
//成功 if (dsInfo == null) {
List<Map> returnList = (List<Map>) JsonMapper.fromJsonString(rst.getMessage(), List.class); throw new RuntimeException("数据源配置信息不存在!");
if (returnList != null && returnList.size() > 0) { }
for (int i = 0; i < returnList.size(); i++) { //解码源数据库密码
Map map = returnList.get(i); if (StringUtils.isNotBlank(dsInfo.getPassword())) {
map.put("id", i + 1); dsInfo.setPassword(new BaseService().decode(dsInfo.getPassword(), publicKey));
} }
//创建jdbc,获取数据源表字段
DmpAgentResult rst = dmpDsAgentServiceImp.getTableColumnList(dsInfo, str.getTargetTableName());
if (!rst.getCode().val().equals("200")) {
return new JsonResult(rst.getCode(), rst.getMessage());
} else { } else {
throw new RuntimeException("无数据!"); //成功
List<Map> returnList = (List<Map>) JsonMapper.fromJsonString(rst.getMessage(), List.class);
if (returnList != null && returnList.size() > 0) {
for (int i = 0; i < returnList.size(); i++) {
Map map = returnList.get(i);
map.put("id", ++len);
returnData.add(map);
}
}
} }
return JsonResult.ok(returnList);
} }
return JsonResult.ok(returnData);
} }
/** /**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment