Commit 19d0e55c authored by sml's avatar sml

冲突解决

parent a5a95b04
......@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
......@@ -94,6 +95,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired
private DmpDevelopTaskHistoryMapper dmpDevelopTaskHistoryMapper;
@Autowired
private RedisTemplate redisTemplate;
/**
* 离线同步任务列表分页查询
......@@ -237,7 +241,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//上传到azkaban todo
//上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl);
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + projectId, "local_sync_project", localTaskZipAbsolutePath, treeName);
}
......@@ -283,7 +287,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//调用azkaban服务
String azkabanApiUrl = publishToProjectSystemInfo.getAzkabanMonitorUrl();
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl);
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanApiUrl, redisTemplate);
list = azkabanApiUtils.getSyncingFlowExecution(projectId, treeName, checkTaskStatusPageReq.getPageNum(), checkTaskStatusPageReq.getPageSize());
SimpleDateFormat dtf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
......@@ -474,13 +478,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRES_NEW)
public JsonResult addSyncTask(SyncDmpTaskAddReq syncDmpTaskAddReq) throws Exception {
JsonResult jsonResult = new JsonResult();
List<Map<String, Object>> result = new ArrayList<>();
Map<String, Object> reqParam = syncDmpTaskAddReq.getParams();
if (reqParam.size() > 0 && reqParam != null) {
jsonResult = addSyncing(reqParam);
DmpDevelopTask data = (DmpDevelopTask) jsonResult.getData();
//保存规则信息
//saveRuleInfo(result, reqParam, jsonResult, String.valueOf(data.getId()));
}
return jsonResult;
}
......@@ -554,11 +554,6 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao.update(task); //更新任务
logger.info("################################## 更新任务数据结束 ############################################");
/*DmpNavigationTree dmpNavigationTree = new DmpNavigationTree();
dmpNavigationTree.setName(taskName);
dmpNavigationTree.setId(treeId);
dmpNavigationTreeDao.update(dmpNavigationTree); //更新节点 树
logger.info("################################## 更新节点 树 结束 ############################################");*/
//更新规则信息
List<DvTaskRuleT> list = new ArrayList<>();
//查询TaskRuleID 集合
......@@ -643,6 +638,10 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
task.setTargetTableName(targetTable);
task.setSourceTableName(sourceTableName);
task.setSourceDbName(sourceDbName);
List<DvTaskRuleT> list = new ArrayList<>();
//更新规则信息
List<Map> taskRules = (List<Map>) body.get("taskRules");
//判断taskId是否存在,存在编辑,不存在新增
if (StringUtils.isEmpty(taskId)) {
task.setVersion("1.0");
task.setCreateUserId(SessionUtils.getCurrentUserId());
......@@ -650,9 +649,6 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao.insert(task); //新增任务数据
this.saveTaskHistory(task); //保存任务历史版本
logger.info("################################## 新增离线任务数据结束 ############################################");
//更新规则信息
List<DvTaskRuleT> list = new ArrayList<>();
List<Map> taskRules = (List<Map>) body.get("taskRules");
//保存dmp数据校验规则信息
settRuleInfo(taskId, taskRules, list);
} else {
......@@ -666,13 +662,22 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao.update(task);
this.saveTaskHistory(task); //保存任务历史版本
logger.info("################################## 编辑离线任务数据结束 ############################################");
//更新规则信息
//查询TaskRuleID 集合
List<Long> listRules = dvTaskRuleTService.getTaskRuleIdsList(Integer.valueOf(taskId));
if (CollectionUtils.isNotEmpty(listRules)) {
//批量删除rule信息
dvTaskRuleTService.delRulesByIds(listRules);
//保存dmp数据校验规则信息
settRuleInfo(String.valueOf(taskId), taskRules, list);
}
}
//保存时提交XML
return dmpDevelopTaskService.submitSyncing(task);
}
/**
* 保存任务历史版本
* 保存离线任务历史版本
*
* @param task
* @return
......@@ -686,22 +691,16 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
DmpDevelopTaskHistory taskHistory = new DmpDevelopTaskHistory();
taskHistory.setTaskId(task.getId());
taskHistory.setVersion(task.getVersion());
//taskHistory.setData(task.getData());
taskHistory.setScript(task.getScript());
taskHistory.setTaskDesc(task.getTaskDesc());
taskHistory.setTreeId(task.getTreeId());
taskHistory.setTaskType(task.getTaskType());
taskHistory.setDatasourceId(task.getDatasourceId());
taskHistory.setCreateTime(task.getCreateTime());
if (StringUtils.isNotEmpty(task.getCreateUserId())) {
taskHistory.setCreateUserId(Integer.valueOf(task.getCreateUserId()));
}
taskHistory.setUpdateTime(task.getUpdateTime());
if (StringUtils.isNotEmpty(task.getUpdateUserId())) {
taskHistory.setUpdateUserId(Integer.valueOf(task.getUpdateUserId()));
}
taskHistory.setCreateTime(new Date());
taskHistory.setCreateUserId(Integer.valueOf(SessionUtils.getCurrentUserId()));
taskHistory.setDataStatus(DelFlagEnum.NO.getValue());
dmpDevelopTaskHistoryMapper.insert(taskHistory);
logger.info("################################## 新增离线任务版本记录结束 ############################################");
return JsonResult.ok();
}
......@@ -821,7 +820,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
}
String azkabanMonitorUrl = map.get("azkabanMonitorUrl").toString();
Long projectId = Long.valueOf(map.get("projectId").toString());
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl);
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
String execId = azkabanApiUtils.stopFlow("jz_localflow_" + projectId, map.get("treeName").toString());
return JsonResult.ok();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment