Commit a9073cad authored by sml's avatar sml

Merge branch 'dmp_dev' of http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-service.git into dmp_dev

parents 4f645937 4726f8d6
......@@ -62,4 +62,8 @@ public class CommConstant {
//azkaban相关常量
public static final String AZKABAN_PROJECTNAME_PREFIX = "jz_workflow_new_"; //azkaban项目名称前缀
/***************************************************/
//模板名称
public static final String API_TEST = "api_test.ftl";
public static final String realTimeTask = "source_connector.ftl";
}
......@@ -74,13 +74,18 @@ public class GatewayApiConstant {
public static final String listServerApplyApi = "/api/interface/listServerApplyApi";
//获取文件夹树
public static final String folderTree = "/api/producer/getFileCatalog";
//服务开发---获取ApiId
public static final String getApiId = "/api/producer/getCustomApiId";
//创建项目文件夹
public static final String createProjectFolder = "/api/producer/createProjectFolder";
//授权--组织名称查询
public static final String getOrgNameList = "/api/auth/get-org-list";
//API测试--下拉框
public static final String listGetApiKey = "/api/interface/listGetApiKey";
//删除文件夹
public static final String delFolder = "/api/producer/delProjectFolder";
//删除文件夹
public static final String functionTemplate = "/api/producer/getFunctionTemplateList";
private static Logger logger = LoggerFactory.getLogger(GatewayApiConstant.class);
......@@ -93,7 +98,7 @@ public class GatewayApiConstant {
if (StringUtils.isEmpty(resultData)) {
throw new RuntimeException("failed!");
}
logger.info("#################响应结果数据{}" + resultData);
logger.info("############## response results{}" + resultData);
Map jsonObject = JSONObject.parseObject(resultData);
if (jsonObject.containsKey("code")) {
......@@ -121,7 +126,7 @@ public class GatewayApiConstant {
if (StringUtils.isEmpty(returnData)) {
throw new RuntimeException("failed!");
}
logger.info("#################响应结果{}" + returnData);
logger.info("############## response results{}" + returnData);
Map jsonObject = JSONObject.parseObject(returnData);
if (jsonObject.containsKey("code")) {
if ("200".equals(jsonObject.get("code").toString())) {
......@@ -149,7 +154,7 @@ public class GatewayApiConstant {
if (StringUtils.isEmpty(returnData)) {
throw new RuntimeException("failed!");
}
logger.info("#################响应结果{}" + returnData);
logger.info("############## response results{}" + returnData);
Map jsonObject = JSONObject.parseObject(returnData);
if (jsonObject.containsKey("code")) {
if ("200".equals(jsonObject.get("code").toString())) {
......
package com.jz.common.utils;
import freemarker.core.ParseException;
import freemarker.template.MalformedTemplateNameException;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import freemarker.template.TemplateNotFoundException;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfig;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
/**
* @ClassName: FreeMarkerUtils
* @Description: freeMarkerUtils
* @Author Bellamy
* @Date 2021/3/3
* @Version 1.0
*/
public class FreeMarkerUtils {
/**
* 使用freemaker模板生成 kafka connector 请求参数
*
* @param type 模板类型
* @param dataModel 模板里定义的变量数据对象
* @return
* @author Bellamy
*/
public static String freemakerJson(String type, Map<String, String> dataModel, FreeMarkerConfig freeMarkerConfig) {
StringWriter stringWriter = new StringWriter();
try {
Template template = freeMarkerConfig.getConfiguration().getTemplate(type);
if (template != null) {
try {
template.process(dataModel, stringWriter);
} catch (TemplateException e) {
e.printStackTrace();
}
}
} catch (TemplateNotFoundException e) {
e.printStackTrace();
} catch (MalformedTemplateNameException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return stringWriter.toString();
}
}
package com.jz.common.utils;
import com.jz.dmp.modules.controller.dataService.DmpApiMangeController;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
/**
*
......@@ -16,7 +20,10 @@ import java.io.UnsupportedEncodingException;
*/
public class MD5SignUtils {
private static Logger logger = LoggerFactory.getLogger(MD5SignUtils.class);
private static final String DEFAULT_CHARSET = "UTF-8";
private static final String SIGN_TYPE = "MD5";
/**
* 签名
......@@ -74,4 +81,37 @@ public class MD5SignUtils {
throw new RuntimeException("MD5签名过程中出现错误,指定的编码集不对,您目前指定的编码集是:" + charset);
}
}
/**
* MD5加盐加密
*
* @param str
* @param salt
* @return
*/
public static String encrypt(String str, String salt) {
try {
MessageDigest md5 = MessageDigest.getInstance(SIGN_TYPE);
md5.update((str + salt).getBytes(DEFAULT_CHARSET));
return byte2hex(md5.digest());
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("md5 加密异常", e);
}
}
return "";
}
public static String byte2hex(byte[] bytes) {
StringBuilder sign = new StringBuilder();
for (int i = 0; i < bytes.length; i++) {
String hex = Integer.toHexString(bytes[i] & 0xFF);
if (hex.length() == 1) {
sign.append("0");
}
sign.append(hex.toUpperCase());
}
return sign.toString();
}
}
package com.jz.common.utils;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.*;
/**
* @author ZC
* @PACKAGE_NAME: com.jz.dm.common.util
* @PROJECT_NAME: jz-dm-parent
* @NAME: MapUtil
* @DATE: 2021-1-2/14:02
* @DAY_NAME_SHORT: 周六
* @Description:
**/
public class MapUtil {
/**
* 获取签名参数
*
* @param apiKey
* @param method
* @param signType
* @return
*/
//public static String getSignValue(String apiKey, String method, String signType) {
// StringBuilder builder = new StringBuilder();
// builder.append("apiKey=").append(apiKey).append(LoggingConstants.AND_SPILT)
// .append("method=").append(method).append(LoggingConstants.AND_SPILT)
// .append("signType=").append(signType);
// return builder.toString();
//}
/**
* 将对象转成TreeMap,属性名为key,属性值为value
*
* @param object 对象
* @return
* @throws IllegalAccessException
*/
public static TreeMap<String, String> objToMap(Object object) throws IllegalAccessException {
Class clazz = object.getClass();
TreeMap<String, String> treeMap = new TreeMap<String, String>();
while (null != clazz.getSuperclass()) {
Field[] declaredFields1 = clazz.getDeclaredFields();
for (Field field : declaredFields1) {
String name = field.getName();
// 获取原来的访问控制权限
boolean accessFlag = field.isAccessible();
// 修改访问控制权限
field.setAccessible(true);
Object value = field.get(object);
// 恢复访问控制权限
field.setAccessible(accessFlag);
if (null != value && StringUtils.isNotBlank(value.toString())) {
//如果是List,将List转换为json字符串
if (value instanceof List) {
value = JSON.toJSONString(value);
}
treeMap.put(name, String.valueOf(value));
}
}
clazz = clazz.getSuperclass();
}
return treeMap;
}
/**
* 按照指定的分割符将list转换为String
*
* @param list
* @param separator
* @return
*/
public static String listToString(List list, String separator) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < list.size(); i++) {
if (i == list.size() - 1) {
sb.append(list.get(i));
} else {
sb.append(list.get(i));
sb.append(separator);
}
}
return sb.toString();
}
/**
* * 把数组元素按照按照字典倒序排序,并按照“参数=参数值”的模式用“&”字符拼接成字符串
* @param params 需要排序并参与字符拼接的参数组
* @return 拼接后字符串
*/
public static String stringInvertSort(Map<String, String> params) {
List<String> keys = new ArrayList<String>(params.keySet());
Collections.reverse(keys);
String prestr = "";
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
String value = params.get(key);
try {
value = URLEncoder.encode(value, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
if (i == keys.size() - 1) {//拼接时,不包括最后一个&字符
prestr = prestr + key + "=" + value;
} else {
prestr = prestr + key + "=" + value + "&";
}
}
return prestr;
}
/**
*   * 把数组所有元素字典排序,并按照“参数=参数值”的模式用“&”字符拼接成字符串
*   * @param params 需要排序并参与字符拼接的参数组
*   * @return 拼接后字符串 
*/
public static String stringNormalSort(Map<String, String> params) {
List<String> keys = new ArrayList<String>(params.keySet());
Collections.sort(keys);
String prestr = "";
for (int i = 0; i < keys.size(); i++) {
String key = keys.get(i);
String value = params.get(key);
try {
value = URLEncoder.encode(value, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
if (i == keys.size() - 1) {//拼接时,不包括最后一个&字符
prestr = prestr + key + "=" + value;
} else {
prestr = prestr + key + "=" + value + "&";
}
}
return prestr;
}
class SpellComparatorUtils implements Comparator {
@Override
public int compare(Object o1, Object o2) {
try {
// 取得比较对象的汉字编码,并将其转换成字符串
String s1 = new String(o1.toString().getBytes("GB2312"), "ISO-8859-1");
String s2 = new String(o2.toString().getBytes("GB2312"), "ISO-8859-1");
// 运用String类的 compareTo()方法对两对象进行比较
return s1.compareTo(s2);
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
}
public static void main(String[] args) {
//Map<String, String> map = new HashMap();
//map.put("name", "hello");
//map.put("value", "world");
//System.out.println(createLinkStringByGet(map));
StringJoiner joiner = new StringJoiner(",","[","]");
joiner.add("123");
joiner.add("456");
joiner.add("789");
System.out.println(joiner);
}
}
......@@ -159,7 +159,7 @@ public class HttpClientUtils {
try {
httpClient = HttpClients.createDefault();
httpPost = new HttpPost(url);
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(50000).setConnectTimeout(50000).build();//设置请求和传输超时时间
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build();//设置请求和传输超时时间
httpPost.setConfig(requestConfig);
StringEntity se = new StringEntity(json, "UTF-8");
//StringEntity se = new StringEntity("UTF-8");
......@@ -199,7 +199,7 @@ public class HttpClientUtils {
try {
httpClient = HttpClients.createDefault();
httpPost = new HttpPost(url);
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(50000).setConnectTimeout(50000).build();//设置请求和传输超时时间
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(10000).setConnectTimeout(10000).build();//设置请求和传输超时时间
httpPost.setConfig(requestConfig);
/* StringEntity se = new StringEntity(json, "UTF-8");
se.setContentType("application/x-www-form-urlencoded");
......@@ -495,7 +495,7 @@ public class HttpClientUtils {
try {
httpClient = HttpClients.createDefault();
httpPost = new HttpPost(url);
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(50000).setConnectTimeout(50000).build();//设置请求和传输超时时间
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build();//设置请求和传输超时时间
httpPost.setConfig(requestConfig);
httpPost.setHeader("token", token);
StringEntity se = new StringEntity(json, "UTF-8");
......@@ -562,6 +562,8 @@ public class HttpClientUtils {
try {
// 创建httpget.
HttpGet httpget = new HttpGet(requestUrl);
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(5000).setConnectTimeout(5000).build();//设置请求和传输超时时间
httpget.setConfig(requestConfig);
System.out.println("get executing request " + httpget.getURI());
// 执行get请求.
CloseableHttpResponse response = httpclient.execute(httpget);
......@@ -572,7 +574,7 @@ public class HttpClientUtils {
System.out.println(response.getStatusLine());
if (entity != null) {
result = EntityUtils.toString(entity);
LOGGER.info(result);
LOGGER.info("response results{}" + result);
}
} catch (ClientProtocolException e) {
......
......@@ -37,4 +37,8 @@ public interface ExecutionFlowsMapper {
* @since 2021-02-03
*/
List<Map> queryExamplesLogByExecId(String execId) throws Exception;
Map<String, Object> queryTaskInstanceStatus() throws Exception;
List<Map> queryLastStatus(@Param("taskName") String[] taskName) throws Exception;
}
......@@ -218,6 +218,26 @@ public class OfflineSynchController {
return jsonResult;
}
/**
* sync-获取数据源表字段
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "sync-获取数据源表字段", notes = "sync-获取数据源表字段")
@PostMapping(value = "/getSyncColumns")
public JsonResult getSyncSoureAndTargetColumns(@RequestBody @Validated Map<String, List<SynchTableColumnsReq>> req) throws Exception {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = offlineSynchService.getSyncSoureAndTargetColumns(req);
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return jsonResult;
}
/**
* 校验规则
*
......
......@@ -70,7 +70,7 @@ public class RealTimeSyncController {
}
/**
* 批量启动实时同步任务
* 批量启动/停止启动实时同步任务
*
* @return
* @author Bellamy
......
......@@ -28,6 +28,9 @@ public class SynchTableColumnsReq implements Serializable {
@ApiModelProperty(value = "目标表名称")
private String targetTableName;
@ApiModelProperty(value = "字段名称")
private String fieldName;
public Long getSourceDbId() {
return sourceDbId;
}
......@@ -44,4 +47,11 @@ public class SynchTableColumnsReq implements Serializable {
this.sourceDbId = sourceDbId;
}
public String getFieldName() {
return fieldName;
}
public void setFieldName(String fieldName) {
this.fieldName = fieldName;
}
}
......@@ -3,6 +3,7 @@ package com.jz.dmp.modules.controller.dataOperation;
import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
import com.jz.dmp.modules.controller.DataIntegration.bean.ConflictCheckReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq;
import com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListDto;
......@@ -74,4 +75,28 @@ public class DmpDevTaskController {
JsonResult list = dmpDevelopTaskService.runTaskByTaskId(taskId);
return list;
}
/**
* 运维大屏--获取任务状态
*
* @return
* @author Bellamy
* @since 2021-02-22
*/
@ApiOperation(value = "运维大屏--重点关注任务状态", notes = "重点关注")
@PostMapping(value = "/getTaskStatus")
public JsonResult getTaskStatus(@RequestParam String projectId) throws Exception {
if (StringUtils.isEmpty(projectId)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "projectId不能为空!");
}
JsonResult result = new JsonResult();
try {
result = dmpDevelopTaskService.getTaskStatus(projectId);
} catch (Exception e) {
result.setMessage(e.getMessage());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return result;
}
}
......@@ -44,6 +44,9 @@ public class DataDevTaskListReq extends BasePageBean {
@ApiModelProperty(value = "任务类型")
private String taskType;
@ApiModelProperty(value = "最后执行状态")
private String status;
public String getProjectId() {
return projectId;
}
......@@ -75,4 +78,12 @@ public class DataDevTaskListReq extends BasePageBean {
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
......@@ -37,7 +37,7 @@ public class DmpApiMangeController {
@Autowired
private DmpApiMangeService dmpApiMangeService;
/* *//**
/* *//**
* 授权给他人的API-列表分页查询
*
* @return
......@@ -178,11 +178,32 @@ public class DmpApiMangeController {
* @since 2021-01-19
*/
@ApiOperation(value = "API测试", notes = "API测试")
@GetMapping(value = "/apiTestInfo")
public JsonResult getApiTestInfo(@RequestBody Map<String,Object> params, HttpServletRequest httpRequest) {
@PostMapping(value = "/apiTestInfo")
public JsonResult getApiTestInfo(@RequestBody Map<String, String> apiKey, HttpServletRequest httpRequest) {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = dmpApiMangeService.apiTestInfo(apiKey);
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return jsonResult;
}
/**
* API测试--下拉框
*
* @author Bellamy
* @since 2021-03-03
*/
@ApiOperation(value = "API测试--下拉框", notes = "API测试--下拉框")
@GetMapping(value = "/apiTestGetApiKey")
@ApiImplicitParam(name = "apiName", value = "apiName")
public JsonResult getApiTestGetApiKey(@RequestParam(required = false) String apiName, HttpServletRequest httpRequest) {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = dmpApiMangeService.apiTestInfo(params);
jsonResult = dmpApiMangeService.getApiTestGetApiKey(apiName);
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
......@@ -370,4 +391,25 @@ public class DmpApiMangeController {
return jsonResult;
}
/**
* 授权--组织名称查询
*
* @author Bellamy
* @since 2021-03-02
*/
@ApiOperation(value = "授权--组织名称查询", notes = "授权--组织名称查询")
@GetMapping(value = "/getAuthOrgName")
@ApiImplicitParam(name = "orgName", value = "组织名称", required = true)
public JsonResult getAuthOrgName(@RequestParam String orgName) {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = dmpApiMangeService.getAuthOrgName(orgName);
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return jsonResult;
}
}
\ No newline at end of file
......@@ -298,11 +298,33 @@ public class DmpApiServiceMangeController {
@ApiOperation(value = "获取文件夹列表", notes = "获取文件夹列表")
@GetMapping(value = "/folderTree")
@ApiImplicitParams({@ApiImplicitParam(name = "projectId", value = "项目id"),
@ApiImplicitParam(name = "orgCode", value = "组织编码")})
public JsonResult getFolderTree(@RequestParam(name = "projectId", required = false) String projectId, @RequestParam(name = "orgCode", required = false) String orgCode) {
@ApiImplicitParam(name = "orgFolder", value = "组织编码")})
public JsonResult getFolderTree(@RequestParam(name = "projectId", required = false) String projectId, @RequestParam(name = "orgFolder", required = false) String orgFolder) {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = dmpApiServiceMangeService.getFolderTree(projectId, orgCode);
jsonResult = dmpApiServiceMangeService.getFolderTree(projectId, orgFolder);
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return jsonResult;
}
/**
* 删除文件夹
*
* @return
* @author Bellamy
* @since 2021-03-3
*/
@ApiOperation(value = "删除文件夹", notes = "删除文件夹")
@GetMapping(value = "/delFolder")
@ApiImplicitParam(name = "id", value = "文件夹id", required = true)
public JsonResult delFolderById(@RequestParam String id) throws Exception {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = dmpApiServiceMangeService.delFolderById(id);
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
......@@ -332,12 +354,12 @@ public class DmpApiServiceMangeController {
}
/**
* 创建项目文件夹
* 创建/编辑项目文件夹
*
* @author Bellamy
* @since 2021-02-24
*/
@ApiOperation(value = "创建项目文件夹", notes = "创建项目文件夹")
@ApiOperation(value = "创建/编辑项目文件夹", notes = "创建/编辑项目文件夹")
@PostMapping(value = "/createProjectFolder")
public JsonResult createProjectFolder(@RequestBody @Validated CreateFolderReq req, HttpServletRequest httpRequest) {
JsonResult jsonResult = new JsonResult();
......@@ -351,6 +373,27 @@ public class DmpApiServiceMangeController {
return jsonResult;
}
/**
* 获取function模板列表
*
* @return
* @author Bellamy
* @since 2021-03-3
*/
@ApiOperation(value = "获取function模板列表", notes = "获取function模板列表")
@GetMapping(value = "/functionTemplate")
public JsonResult getFunctionTemplateList() throws Exception {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = dmpApiServiceMangeService.getFunctionTemplateList();
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return jsonResult;
}
/**
* 获取数据源表字段
*
......
......@@ -18,12 +18,16 @@ import java.io.Serializable;
@ApiModel("授权api列表请求对象")
public class AuthApiListReq extends BasePageBean implements Serializable {
@ApiModelProperty(value = "文件夹id", required = false)
@ApiModelProperty(value = "文件夹id",required = false)
private Long fileId;
@ApiModelProperty(value = "组织编码", required = false)
@ApiModelProperty(value = "文件来源:,1 API制做,2 组织创建",required = false)
private String fileSource;
@ApiModelProperty(value = "组织编码",required = false)
private String orgCode;
@ApiModelProperty(value = "组织名称", required = false)
@ApiModelProperty(value = "组织名称",required = false)
private String orgName;
@ApiModelProperty(value = "api名称", required = false)
@ApiModelProperty(value = "api名称",required = false)
private String apiName;
@ApiModelProperty(value = "项目id")
private Long projectId;
}
......@@ -24,7 +24,7 @@ public class CreateFolderReq implements Serializable {
@ApiModelProperty(value = "项目id", required = false)
private Long projectId;
@ApiModelProperty(value = "组织编码", required = false)
@ApiModelProperty(value = "组织编码/编辑时不允许修改", required = false)
private String orgCode;
@ApiModelProperty(value = "父类文件夹id ,创建同级不传", required = false)
......@@ -40,8 +40,14 @@ public class CreateFolderReq implements Serializable {
@NotNull(message = "文件来源不能为空")
private String fileSource;
@ApiModelProperty(value = "创建用户", required = false)
private String createUser;
//---------------------------------更新时id必传------------------
@ApiModelProperty(value = "文件id", required = false)
private String id;
@ApiModelProperty(value = "更新用户", required = false)
private String updateUser;
}
......@@ -24,9 +24,20 @@ public class LogInfoListReq extends BasePageBean implements Serializable {
@ApiModelProperty(value = "客户请求token")
private String requestToken;
@ApiModelProperty(value = "开始时间")
private String startDate;
@ApiModelProperty(value = "结束时间")
private String endDate;
@ApiModelProperty(value = "状态:SUCCEED 请求成功, FAIL 请求失败")
private String status;
@ApiModelProperty(value = "创建时间")
private String createDate;
@ApiModelProperty(value = "历史查询-数据银行")
private String historyQuery;
}
......@@ -27,4 +27,10 @@ public class OrganizationManageListQueryReq extends BasePageBean implements Seri
@ApiModelProperty(value = "联系人")
private String linkman;
@ApiModelProperty(value = "文件夹id")
private String fileId;
@ApiModelProperty(value = "工程id")
private Long projectId;
}
......@@ -195,4 +195,13 @@ public interface DmpRealtimeSyncInfoDao {
* @since 2021-02-02
*/
int insertRealtimeHistory(DmpRealtimeTaskHistory taskHistory) throws Exception;
/**
* 获取实时任务状态
*
* @return
* @author Bellamy
* @since 2021-02-02
*/
Map<String, Object> queryTaskStatus(String projectId) throws Exception;
}
\ No newline at end of file
......@@ -68,7 +68,7 @@ public interface DmpApiMangeService {
* @author Bellamy
* @since 2021-01-19
*/
JsonResult apiTestInfo(Map<String,Object> params) throws Exception;
JsonResult apiTestInfo(Map<String,String> params) throws Exception;
/**
* 查看日志
......@@ -132,4 +132,20 @@ public interface DmpApiMangeService {
* @since 2021-02-20
*/
JsonResult logDetails(String id) throws Exception;
/**
* 授权--组织名称查询
*
* @author Bellamy
* @since 2021-03-02
*/
JsonResult getAuthOrgName(String orgName) throws Exception;
/**
* API测试--下拉框
*
* @author Bellamy
* @since 2021-03-03
*/
JsonResult getApiTestGetApiKey(String apiName) throws Exception;
}
......@@ -97,7 +97,7 @@ public interface DmpApiServiceMangeService {
* @author Bellamy
* @since 2021-02-24
*/
JsonResult getFolderTree(String projectId, String orgCode) throws Exception;
JsonResult getFolderTree(String projectId, String orgFolder) throws Exception;
/**
* API计量--API已调用列表
......@@ -122,4 +122,22 @@ public interface DmpApiServiceMangeService {
* @since 2021-02-24
*/
JsonResult createProjectFolder(CreateFolderReq req) throws Exception;
/**
* 删除文件夹
*
* @return
* @author Bellamy
* @since 2021-03-3
*/
JsonResult delFolderById(String id) throws Exception;
/**
* 获取function模板列表
*
* @return
* @author Bellamy
* @since 2021-03-3
*/
JsonResult getFunctionTemplateList() throws Exception;
}
\ No newline at end of file
......@@ -174,4 +174,13 @@ public interface DmpDevelopTaskService {
* @since 2021-02-03
*/
JsonResult queryExamplesLogByExecId(String execId) throws Exception;
/**
* 运维大屏--获取任务状态
*
* @return
* @author Bellamy
* @since 2021-02-22
*/
JsonResult getTaskStatus(String projectId) throws Exception;
}
\ No newline at end of file
......@@ -130,4 +130,12 @@ public interface OfflineSynchService {
* @since 2021-02-19
*/
JsonResult getDataPreview(OfflineDataPreview req) throws Exception;
/**
* sync-获取数据源表字段
*
* @return
* @author Bellamy
*/
JsonResult getSyncSoureAndTargetColumns(Map<String, List<SynchTableColumnsReq>> req) throws Exception;
}
package com.jz.dmp.modules.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.jz.common.constant.CommConstant;
import com.jz.common.constant.GatewayApiConstant;
import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.utils.FreeMarkerUtils;
import com.jz.common.utils.MD5SignUtils;
import com.jz.common.utils.MapUtil;
import com.jz.common.utils.web.HttpClientUtils;
import com.jz.dmp.modules.controller.dataService.bean.*;
import com.jz.dmp.modules.service.DmpApiMangeService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.view.freemarker.FreeMarkerConfig;
import java.util.HashMap;
import java.util.Map;
......@@ -31,6 +37,9 @@ public class DmpApiMangeServiceImpl implements DmpApiMangeService {
@Value("${spring.gateway-url}")
private String gatewayUrl;
@Autowired
private FreeMarkerConfig freeMarkerConfig;
/**
* 授权给他人的API-列表分页查询
*
......@@ -193,12 +202,49 @@ public class DmpApiMangeServiceImpl implements DmpApiMangeService {
* @since 2021-01-19
*/
@Override
public JsonResult apiTestInfo(Map<String, Object> params) throws Exception {
public JsonResult apiTestInfo(Map<String, String> params) throws Exception {
String url = gatewayUrl + GatewayApiConstant.testApi;
JsonResult result = GatewayApiConstant.postRequest2GetData(url, JSONObject.toJSONString(params));
if (StringUtils.isEmpty(params.get("apiKey"))) {
return JsonResult.error(ResultCode.PARAMS_ERROR, "apiKey不能为空!");
}
params.put("apiKey", params.get("apiKey"));
params.put("timestamp", String.valueOf(System.currentTimeMillis()));
//使用freemaker模板生成 请求参数
String jsonStr = FreeMarkerUtils.freemakerJson(CommConstant.API_TEST, params, freeMarkerConfig);
Map<String, Object> requestJson = (Map<String, Object>) JSONObject.parse(jsonStr);
//对签约参数进行字典排序
String signParams = MapUtil.stringNormalSort(assembleSignMap(requestJson));
if (StringUtils.isNotEmpty(signParams)) {
String sign = MD5SignUtils.encrypt(signParams, "");
requestJson.put("sign", sign);
}
JsonResult result = GatewayApiConstant.postRequest2GetData(url, JSONObject.toJSONString(requestJson));
if (null == result.getData()) {
throw new RuntimeException("failed!");
}
Map<String, Object> data = (Map<String, Object>) result.getData();
result.setData(data.get("response_result"));
return result;
}
/**
* 组装签名参数
*
* @param params
* @return
*/
private Map assembleSignMap(Map<String, Object> params) {
Map<String, String> paramsMap = new HashMap<>();
paramsMap.put("apiKey", (String) params.get("apiKey"));
paramsMap.put("method", (String) params.get("method"));
paramsMap.put("signType", (String) params.get("signType"));
paramsMap.put("params", JSONObject.toJSONString(params.get("params")));
return paramsMap;
}
/**
* 查看日志
*
......@@ -207,26 +253,8 @@ public class DmpApiMangeServiceImpl implements DmpApiMangeService {
*/
@Override
public JsonResult checkApiLogInfo(LogInfoListReq req) throws Exception {
JsonResult result = new JsonResult();
String url = gatewayUrl + GatewayApiConstant.checkApiLog;
String resultData = HttpClientUtils.post(url, JSONObject.toJSONString(req));
if (StringUtils.isEmpty(resultData)) {
throw new RuntimeException("查询失败!");
}
logger.info("#################响应结果数据{}" + resultData);
Map jsonObject = JSONObject.parseObject(resultData);
if (jsonObject.containsKey("code")) {
if ("200".equals(jsonObject.get("code").toString())) {
result.setData(jsonObject.get("data"));
return result;
}
}
if (jsonObject.containsKey("msg")) {
logger.info(jsonObject.get("msg").toString());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
result.setMessage(jsonObject.get("msg").toString());
}
JsonResult result = GatewayApiConstant.postRequest2GetData(url, JSONObject.toJSONString(req));
return result;
}
......@@ -425,4 +453,35 @@ public class DmpApiMangeServiceImpl implements DmpApiMangeService {
return result;
}
/**
* 授权--组织名称查询
*
* @param orgName
* @author Bellamy
* @since 2021-03-02
*/
@Override
public JsonResult getAuthOrgName(String orgName) throws Exception {
String url = gatewayUrl + GatewayApiConstant.getOrgNameList;
Map params = new HashMap();
params.put("orgName", orgName);
JsonResult result = GatewayApiConstant.getRequest2GetData(url, params);
return result;
}
/**
* API测试--下拉框
*
* @author Bellamy
* @since 2021-03-03
*/
@Override
public JsonResult getApiTestGetApiKey(String apiName) throws Exception {
String url = gatewayUrl + GatewayApiConstant.listGetApiKey;
Map params = new HashMap();
params.put("apiName", apiName);
JsonResult result = GatewayApiConstant.getRequest2GetData(url, params);
return result;
}
}
......@@ -241,10 +241,45 @@ public class DmpApiServiceMangeServiceImpl implements DmpApiServiceMangeService
public JsonResult createProjectFolder(CreateFolderReq req) throws Exception {
String url = gatewayUrl + GatewayApiConstant.createProjectFolder;
req.setCreateUser(SessionUtils.getCurrentUserName());
if (StringUtils.isNotEmpty(req.getId())) {
req.setUpdateUser(SessionUtils.getCurrentUserName());
}
JsonResult result = GatewayApiConstant.postRequest2GetData(url, JSONObject.toJSONString(req));
return result;
}
/**
* 删除文件夹
*
* @param id
* @return
* @author Bellamy
* @since 2021-03-3
*/
@Override
public JsonResult delFolderById(String id) throws Exception {
String url = gatewayUrl + GatewayApiConstant.delFolder;
Map params = new HashMap();
params.put("id", id);
JsonResult result = GatewayApiConstant.getRequest2GetData(url, params);
return result;
}
/**
* 获取function模板列表
*
* @return
* @author Bellamy
* @since 2021-03-3
*/
@Override
public JsonResult getFunctionTemplateList() throws Exception {
String url = gatewayUrl + GatewayApiConstant.functionTemplate;
Map params = new HashMap();
JsonResult result = GatewayApiConstant.getRequest2GetData(url, params);
return result;
}
/**
* 服务开发API列表
*
......@@ -269,31 +304,15 @@ public class DmpApiServiceMangeServiceImpl implements DmpApiServiceMangeService
* @since 2021-02-24
*/
@Override
public JsonResult getFolderTree(String projectId, String orgCode) throws Exception {
JsonResult result = new JsonResult();
public JsonResult getFolderTree(String projectId, String orgFolder) throws Exception {
String url = gatewayUrl + GatewayApiConstant.folderTree;
Map params = new HashMap();
if (StringUtils.isNotEmpty(projectId)) {
params.put("projectId", projectId);
}
params.put("orgCode", orgCode);
String returnData = HttpClientUtils.getJsonForParam(url, params);
if (StringUtils.isEmpty(returnData)) {
throw new RuntimeException("查询失败!");
}
logger.info("#################响应结果{}" + returnData);
Map map = JSONObject.parseObject(returnData);
if (map.containsKey("code")) {
if ("200".equals(map.get("code").toString())) {
return JsonResult.ok(map.get("data"));
}
}
if (map.containsKey("msg")) {
logger.info(map.get("msg").toString());
result.setMessage(map.get("msg").toString());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
params.put("orgFolder", orgFolder);
JsonResult result = GatewayApiConstant.getRequest2GetData(url, params);
return result;
}
......
package com.jz.dmp.modules.service.impl;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import javax.servlet.http.HttpServletRequest;
import com.alibaba.fastjson.JSONObject;
import com.mysql.jdbc.Blob;
import org.apache.tomcat.jni.Mmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.ThrowsAdvice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.pagehelper.Page;
......@@ -40,15 +15,7 @@ import com.jz.common.constant.StatuConstant;
import com.jz.common.enums.ModuleLogEnum;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService;
import com.jz.common.utils.AzkabanApiUtils2;
import com.jz.common.utils.CodeGeneratorUtils;
import com.jz.common.utils.DateUtils;
import com.jz.common.utils.FileUtils;
import com.jz.common.utils.FlowParseTool;
import com.jz.common.utils.GZIPUtils;
import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.StringUtils;
import com.jz.common.utils.ZipUtils;
import com.jz.common.utils.*;
import com.jz.common.utils.web.XmlUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.azkaban.dao.ExecutionFlowsMapper;
......@@ -62,27 +29,25 @@ import com.jz.dmp.modules.controller.dataOperation.bean.DataDevExamplesListReq;
import com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListDto;
import com.jz.dmp.modules.controller.dataOperation.bean.DataDevTaskListReq;
import com.jz.dmp.modules.controller.projconfig.bean.DmpProjectConfigInfoDto;
import com.jz.dmp.modules.dao.DmpDevelopTaskDao;
import com.jz.dmp.modules.dao.DmpNavigationTreeDao;
import com.jz.dmp.modules.dao.DmpProjectDao;
import com.jz.dmp.modules.dao.DmpSyncingDatasourceTypeDao;
import com.jz.dmp.modules.dao.*;
import com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import com.jz.dmp.modules.model.DmpDevelopTask;
import com.jz.dmp.modules.model.DmpDevelopTaskHistory;
import com.jz.dmp.modules.model.DmpModuleOperateLog;
import com.jz.dmp.modules.model.DmpNavigationTree;
import com.jz.dmp.modules.model.DmpProjectConfigInfo;
import com.jz.dmp.modules.model.DmpProjectSystemInfo;
import com.jz.dmp.modules.model.DmpSyncingDatasource;
import com.jz.dmp.modules.model.DmpSyncingDatasourceType;
import com.jz.dmp.modules.service.DmpDevelopTaskHistoryService;
import com.jz.dmp.modules.service.DmpDevelopTaskService;
import com.jz.dmp.modules.service.DmpModuleOperateLogService;
import com.jz.dmp.modules.service.DmpSyncingDatasourceService;
import com.jz.dmp.modules.service.FlowService;
import com.jz.dmp.modules.service.OfflineSynchService;
import com.jz.dmp.modules.service.projconfig.DmpProjectConfigInfoService;
import com.jz.dmp.modules.model.*;
import com.jz.dmp.modules.service.*;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import javax.servlet.http.HttpServletRequest;
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.regex.Pattern;
/**
* 任务开发(DmpDevelopTask)表服务实现类
......@@ -138,6 +103,9 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private DmpRealtimeSyncInfoDao dmpRealtimeSyncInfoDao;
/**
* 添加保存dmp数据(包含校验数据)
*
......@@ -810,6 +778,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
public PageInfoResponse<DataDevTaskListDto> queryDevTaskListPage(DataDevTaskListReq req) throws Exception {
PageInfoResponse<DataDevTaskListDto> pageInfoResponse = new PageInfoResponse<>();
if (StringUtils.isNotBlank(req.getTreeIdOrName())) {
//判断是否为整数 是整数返回true,否则返回false
Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
......@@ -828,6 +797,13 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
pageInfoResponse.setCode(ResultCode.SUCCESS);
pageInfoResponse.setMessage("查询成功");
pageInfoResponse.setData(pageInfo);
String taskName = "";
List<DataDevTaskListDto> listObj = (List<DataDevTaskListDto>) pageInfo;
for (DataDevTaskListDto str : listObj) {
taskName += "," + str.getTaskName();
}
List<Map> flowList = executionFlowsMapper.queryLastStatus(taskName.substring(1).split(","));
return pageInfoResponse;
}
......@@ -985,7 +961,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
String obderByStr = dmpDevelopTaskRequest.takeOrderByStr(DmpDevelopTaskRequest.class);
Page page = null;
if (obderByStr==null) {
if (obderByStr == null) {
page = PageHelper.startPage(dmpDevelopTaskRequest.getPageNum(), dmpDevelopTaskRequest.getPageSize());
} else {
page = PageHelper.startPage(dmpDevelopTaskRequest.getPageNum(), dmpDevelopTaskRequest.getPageSize(), obderByStr);
......@@ -1046,10 +1022,10 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
//通过任务名称,去查询开发实例
PageHelper.startPage(req.getPageNum(), req.getPageSize());
Map saveParams = new HashMap();
if (org.apache.commons.lang3.StringUtils.isNotEmpty(req.getBusinessTime())) { //业务时间范围
String[] cretime = req.getBusinessTime().split("-");
saveParams.put("startTime", cretime[0].trim() + " 00:00:00");
saveParams.put("endTime", cretime[1].trim() + " 23:59:59");
if (StringUtils.isNotBlank(req.getStartTime()) && StringUtils.isNotBlank(req.getEndTime())) { //业务时间范围
//String[] cretime = req.getBusinessTime().split("-");
saveParams.put("startTime", req.getStartTime().trim() + " 00:00:00");
saveParams.put("endTime", req.getEndTime().trim() + " 23:59:59");
} else if (StringUtils.isNotBlank(req.getCreTimeType())) {
if ("01".equals(req.getCreTimeType())) { //昨天
saveParams.put("businessTime", DateUtils.getYesterdayStr());
......@@ -1087,7 +1063,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
/**
*根据treeId取得最新版提交的同步脚本文件名(不含后缀)及版本信息
* 根据treeId取得最新版提交的同步脚本文件名(不含后缀)及版本信息
*/
@Override
public String getConfigFileNameNotSuffix4Published(Long treeId) throws Exception {
......@@ -1096,7 +1072,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
return fileName + "@" + log.getRemark();
}
private DmpModuleOperateLog getLastVersion4SubmitSyncInfo(Integer treeId)throws Exception {
private DmpModuleOperateLog getLastVersion4SubmitSyncInfo(Integer treeId) throws Exception {
DmpModuleOperateLog o = null;
try {
o = dmpModuleOperateLogService.getLastOperateLog(treeId, ModuleLogEnum.VERSION_SYNC_XML);
......@@ -1187,7 +1163,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
int revision = 1;
ObjectNode flowHeaderNode = objectMapper.createObjectNode();
flowHeaderNode.put("name", dmpDevelopTask.getName() );
flowHeaderNode.put("name", dmpDevelopTask.getName());
flowHeaderNode.put("description", dmpDevelopTask.getTaskDesc());
flowHeaderNode.put("revision", revision);
dmpDevelopTask.setFlowHeader(flowHeaderNode.toString());
......@@ -1196,7 +1172,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
scriptNode.put("id", "canvas");
scriptNode.put("resourceId", "canvas");
ObjectNode stencilSetNode = objectMapper.createObjectNode();
stencilSetNode.put("namespace","http://b3mn.org/stencilset/bpmn2.0#");
stencilSetNode.put("namespace", "http://b3mn.org/stencilset/bpmn2.0#");
scriptNode.put("stencilset", stencilSetNode);
byte[] data = null;
try {
......@@ -1249,7 +1225,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
/**
*任务修改
* 任务修改
*/
@Override
@Transactional(rollbackFor = Exception.class)
......@@ -1286,7 +1262,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
/**
*任务流程发布
* 任务流程发布
*/
@Override
public BaseResponse flowSubmit(Long treeId, HttpServletRequest httpRequest)
......@@ -1342,7 +1318,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
/**
*根据treeId获取离线任务xmlFileName
* 根据treeId获取离线任务xmlFileName
*/
@Override
public String getExecXmlFileName(Long syncTaskTreeId) throws Exception {
......@@ -1358,7 +1334,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
/**
*运行任务
* 运行任务
*/
@Override
public BaseResponse taskAzkabanRun(Long treeId, HttpServletRequest httpRequest) throws Exception {
......@@ -1466,11 +1442,11 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
//上次zip包到azkaban
String localTaskZipAbsolutePath = localTaskZipPath + "/" + localZipTargetFileName;
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_"+taskAlias+"_" + projectId, "local_"+taskAlias+"_project", localTaskZipAbsolutePath, treeName);
return azkabanApiUtils.loginCreateProjectuploadZipAndExecute("jz_localflow_" + taskAlias + "_" + projectId, "local_" + taskAlias + "_project", localTaskZipAbsolutePath, treeName);
}
/**
*停止任务
* 停止任务
*/
@Override
public BaseBeanResponse<String> taskAzkabanStop(Long treeId, HttpServletRequest httpRequest) throws Exception {
......@@ -1478,7 +1454,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
BaseBeanResponse<String> baseBeanResponse = new BaseBeanResponse<String>();
DmpDevelopTask dmpDevelopTask = dmpDevelopTaskDao.get(treeId);
if (dmpDevelopTask==null) {
if (dmpDevelopTask == null) {
baseBeanResponse.setCode(StatuConstant.CODE_ERROR_PARAMETER);
baseBeanResponse.setMessage("任务不存在");
return baseBeanResponse;
......@@ -1512,7 +1488,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
AzkabanApiUtils2 azkabanApiUtils = new AzkabanApiUtils2(azkabanMonitorUrl, redisTemplate);
String exeIdsStr = azkabanApiUtils.stopFlow("jz_localflow_"+taskAlias+"_" + projectId, treeName);
String exeIdsStr = azkabanApiUtils.stopFlow("jz_localflow_" + taskAlias + "_" + projectId, treeName);
baseBeanResponse.setCode(StatuConstant.SUCCESS_CODE);
baseBeanResponse.setMessage("停止成功");
......@@ -1522,7 +1498,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
/**
*SHELL/SQL版本发布
* SHELL/SQL版本发布
*/
@Override
public BaseResponse taskPublish(Long treeId, HttpServletRequest httpRequest) throws Exception {
......@@ -1547,7 +1523,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
}
/**
*软删除任务
* 软删除任务
*/
@Override
@Transactional(rollbackFor = Exception.class)
......@@ -1562,9 +1538,9 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
if (baseResponseAzkaban.getCode().equals(StatuConstant.FAILURE_CODE)) {
throw new RuntimeException("azkaban取消发布任务失败");
}
}else if (taskType.equals(CommConstant.TASK_TYPE_DEVSHELL) || taskType.equals(CommConstant.TASK_TYPE_DEVSQL)) {
} else if (taskType.equals(CommConstant.TASK_TYPE_DEVSHELL) || taskType.equals(CommConstant.TASK_TYPE_DEVSQL)) {
//什么也不做
}else {
} else {
baseResponse.setCode(StatuConstant.CODE_DATA_NOTMEET);
baseResponse.setMessage("任务类型不适合调用该方法");
return baseResponse;
......@@ -1596,11 +1572,43 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
@Override
public JsonResult queryExamplesLogByExecId(String execId) throws Exception {
List<Map> list = executionFlowsMapper.queryExamplesLogByExecId(execId);
if(list.size() > 0 && list != null) {
if (list.size() > 0 && list != null) {
list.forEach(map -> {
//map.put("log", map.get("log"));
});
}
return JsonResult.ok(list);
}
/**
* 运维大屏--获取任务状态
*
* @param projectId
* @return
* @author Bellamy
* @since 2021-02-22
*/
@Override
public JsonResult getTaskStatus(String projectId) throws Exception {
Map<String, Integer> taskStatus = new HashMap<>();
//RUNNING, FAILED,SUCCESS
//实时任务状态
Map<String, Object> realTimeTask = dmpRealtimeSyncInfoDao.queryTaskStatus(projectId);
Integer realTimeRunNum = Integer.valueOf(String.valueOf(realTimeTask.get("running")));
Integer realTimeFailedNum = Integer.valueOf(String.valueOf(realTimeTask.get("failed")));
//业务流程实例状态 :30-正在运行,50-运行成功,60-kill任务,70-运行失败
Map<String, Object> flowStatus = executionFlowsMapper.queryTaskInstanceStatus();
Integer flowRunNum = Integer.valueOf(String.valueOf(flowStatus.get("running")));
Integer flowSuccessNum = Integer.valueOf(String.valueOf(flowStatus.get("success")));
Integer flowFailedNum = Integer.valueOf(String.valueOf(flowStatus.get("failed")));
Integer flowManualNum = Integer.valueOf(String.valueOf(flowStatus.get("manualNum")));
taskStatus.put("running", realTimeRunNum + flowRunNum);
taskStatus.put("success", flowSuccessNum);
taskStatus.put("failed", realTimeFailedNum + flowFailedNum);
taskStatus.put("manual", flowManualNum);
return JsonResult.ok(taskStatus);
}
}
\ No newline at end of file
......@@ -41,29 +41,12 @@ public class DmpOrgMangeServiceImpl implements DmpOrgMangeService {
*/
@Override
public JsonResult queryOrgListPage(OrganizationManageListQueryReq req) throws Exception {
JsonResult result = new JsonResult();
String url = gatewayUrl + GatewayApiConstant.orgListPage;
if (StringUtils.isNotEmpty(req.getOrgName())) {
req.setOrgName(req.getOrgName().trim());
}
String resultData = HttpClientUtils.post(url, JSONObject.toJSONString(req));
if (StringUtils.isEmpty(resultData)) {
throw new RuntimeException("查询失败!");
}
logger.info("#################组织管理列数据{}" + resultData);
Map jsonObject = JSONObject.parseObject(resultData);
if (jsonObject.containsKey("code")) {
if ("200".equals(jsonObject.get("code").toString())) {
return JsonResult.ok(jsonObject.get("data"));
}
}
if (jsonObject.containsKey("msg")) {
logger.info(jsonObject.get("msg").toString());
result.setMessage(jsonObject.get("msg").toString());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
JsonResult result = GatewayApiConstant.postRequest2GetData(url, JSONObject.toJSONString(req));
return result;
}
......@@ -79,18 +62,8 @@ public class DmpOrgMangeServiceImpl implements DmpOrgMangeService {
Map params = new HashMap();
params.put("id", id);
String url = gatewayUrl + GatewayApiConstant.delOrg;
String returnData = HttpClientUtils.getJsonForParam(url, params);
if (StringUtils.isEmpty(returnData)) {
throw new RuntimeException("删除失败!");
}
logger.info("#################响应结果{}" + returnData);
Map map = JSONObject.parseObject(returnData);
if (map.containsKey("code")) {
if ("200".equals(map.get("code").toString())) {
return JsonResult.ok();
}
}
return JsonResult.error("删除失败!");
JsonResult result = GatewayApiConstant.getRequest2GetData(url, params);
return result;
}
/**
......@@ -102,26 +75,9 @@ public class DmpOrgMangeServiceImpl implements DmpOrgMangeService {
*/
@Override
public JsonResult addOrg(OrganizationManageAddReq req) throws Exception {
JsonResult result = new JsonResult();
String url = gatewayUrl + GatewayApiConstant.addOrg;
req.setCreateUser(SessionUtils.getCurrentUserName());
String returnData = HttpClientUtils.post(url, JSONObject.toJSONString(req));
if (StringUtils.isEmpty(returnData)) {
throw new RuntimeException("新增失败!");
}
logger.info("#################响应结果{}" + returnData);
Map map = JSONObject.parseObject(returnData);
if (map.containsKey("code")) {
if ("200".equals(map.get("code").toString())) {
return JsonResult.ok();
}
}
if (map.containsKey("msg")) {
logger.info(map.get("msg").toString());
result.setMessage(map.get("msg").toString());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
JsonResult result = GatewayApiConstant.postRequest2GetData(url, JSONObject.toJSONString(req));
return result;
}
......@@ -134,27 +90,10 @@ public class DmpOrgMangeServiceImpl implements DmpOrgMangeService {
*/
@Override
public JsonResult updateOrg(OrganizationManageUpdateReq req) throws Exception {
JsonResult result = new JsonResult();
String url = gatewayUrl + GatewayApiConstant.addOrg;
req.setCreateUser(SessionUtils.getCurrentUserName());
String returnData = HttpClientUtils.post(url, JSONObject.toJSONString(req));
if (StringUtils.isEmpty(returnData)) {
throw new RuntimeException("编辑失败!");
}
logger.info("#################响应结果{}" + returnData);
Map map = JSONObject.parseObject(returnData);
if (map.containsKey("code")) {
if ("200".equals(map.get("code").toString())) {
return JsonResult.ok();
}
}
if (map.containsKey("msg")) {
logger.info(map.get("msg").toString());
result.setMessage(map.get("msg").toString());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
JsonResult result = GatewayApiConstant.postRequest2GetData(url, JSONObject.toJSONString(req));
return result;
}
......@@ -166,27 +105,10 @@ public class DmpOrgMangeServiceImpl implements DmpOrgMangeService {
*/
@Override
public JsonResult getOrgInfoByOrgId(String id) throws Exception {
JsonResult result = new JsonResult();
String url = gatewayUrl + GatewayApiConstant.orgDetail;
OrganizationManageDetailQueryReq req = new OrganizationManageDetailQueryReq();
req.setId(Long.valueOf(id));
String returnData = HttpClientUtils.post(url, JSONObject.toJSONString(req));
if (StringUtils.isEmpty(returnData)) {
throw new RuntimeException("查询失败!");
}
logger.info("#################响应结果{}" + returnData);
Map map = JSONObject.parseObject(returnData);
if (map.containsKey("code")) {
if ("200".equals(map.get("code").toString())) {
return JsonResult.ok(map.get("data"));
}
}
if (map.containsKey("msg")) {
logger.info(map.get("msg").toString());
result.setMessage(map.get("msg").toString());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
JsonResult result = GatewayApiConstant.postRequest2GetData(url, JSONObject.toJSONString(req));
return result;
}
}
......@@ -2,6 +2,7 @@ package com.jz.dmp.modules.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.amazonaws.services.dynamodbv2.xspec.M;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.jz.agent.service.DmpDsAgentService;
......@@ -347,7 +348,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
Integer srcDataSourceId = sourceDbInfo.getId();
Integer targetDataSourceId = targetDbInfo.getId();
logger.info("###################开始--同步数据源到数据源任务################### ");
Long realtiemId = null; //同步任务id
//源数据源到数据源同步connector信息
//解析黑名单表
......@@ -423,10 +423,21 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
DmpRealtimeTaskHistory taskHistory = new DmpRealtimeTaskHistory();
BeanUtils.copyProperties(saveBody, taskHistory);
if (StringUtils.isEmpty(String.valueOf(params.get("taskId")))) {
Map<String, String> respData = publishTask2Kafka(connectorUrl, jsonStr);
Map<String, String> respData = publishTask2Kafka(connectorUrl, jsonStr, "");
saveBody.setConnectorJobId(respData.get("connectorJobId"));
saveBody.setStatus(respData.get("status"));
dmpRealtimeSyncInfoDao.insert(saveBody);
//异步获取任务状态
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Map map = new HashMap();
map.put("id", saveBody.getId());
getKafkaTaskStatus(connectorUrl + respData.get("connectorJobId") + "/status", map);
}
});
thread.start();
} else {
DmpRealtimeSyncInfo realtimeTask = dmpRealtimeSyncInfoDao.queryById(Integer.valueOf(params.get("taskId").toString()));
if (realtimeTask == null)
......@@ -436,7 +447,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
//编辑时,先删除任务,再发布任务
HttpClientUtils.httpDelete(connectorUrl + "/" + realtimeTask.getConnectorJobId() + "/");
Map<String, String> respData = publishTask2Kafka(connectorUrl, jsonStr);
Map<String, String> respData = publishTask2Kafka(connectorUrl, jsonStr, params.get("taskId").toString());
saveBody.setConnectorJobId(respData.get("connectorJobId"));
saveBody.setStatus(respData.get("status"));
saveBody.setUpdateTime(new Date());
......@@ -449,7 +460,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
version = version.add(new BigDecimal(1.0));
taskHistory.setVersion(String.valueOf(version));
}
logger.info("###################保存实时同步任务--结束 ################");
logger.info("################### save task end ################");
taskHistory.setRealtimeSyncId(saveBody.getId());
dmpRealtimeSyncInfoDao.insertRealtimeHistory(taskHistory);
......@@ -468,7 +479,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
/*
* 发布到kafak 并开始运行 ,请求接口正常则保存数据,否则失败
* */
public Map<String, String> publishTask2Kafka(String connectorUrl, String jsonStr) throws Exception {
public Map<String, String> publishTask2Kafka(String connectorUrl, String jsonStr, String id) throws Exception {
Map<String, String> returnMap = new HashMap();
Map<String, Object> result = RestClient.post(connectorUrl, jsonStr);
logger.info("=======response data {}" + JSONObject.toJSONString(result));
......@@ -476,7 +487,10 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
if (StringUtils.isEmpty(connectorJobId)) {
throw new RuntimeException("提交失败!");
}
String status = getExecuteShellStatus(connectorUrl + "/" + connectorJobId + "/status", new HashMap());
Map params = new HashMap();
params.put("id", id);
params.put("flag", "publish");
String status = getExecuteKafkaStatus(connectorUrl + "/" + connectorJobId + "/status", params);
returnMap.put("status", status);
returnMap.put("connectorJobId", connectorJobId);
return returnMap;
......@@ -990,7 +1004,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
String url = connectorUrl + "/" + connectorJobId;
String statusUrl = url;
//resume 恢复,删除 delete,pause 暂停
String param = "";
if ("01".equals(type)) {
url += "/resume";
} else if ("02".equals(type)) {
......@@ -998,10 +1011,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
logger.info("######执行表数据id{}" + ids[i]);
//执行 shell
//请求 kafka api
OKHttpUtil.httpPut(url, "");
//获取任务状态
String status = getExecuteShellStatus(statusUrl + "/status", new HashMap<>());
Map params = new HashMap<>();
params.put("id", realTaskId);
params.put("flag", url.substring(url.lastIndexOf("/") + 1));
String status = getExecuteKafkaStatus(statusUrl + "/status", params);
//执行后任务状态 :PAUSED 暂停 , RUNNING 运行中
saveBaby.setStatus(status);
saveBaby.setUptPerson(SessionUtils.getCurrentUserId());
......@@ -1014,23 +1031,62 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
/*
* 获取任务状态
* kafka rest api 获取任务状态
* */
public String getExecuteShellStatus(String statusUrl, Map params) throws Exception {
String respData = HttpClientUtils.getJsonForParam(statusUrl, params);
public String getExecuteKafkaStatus(String statusUrl, Map params) throws Exception {
String respData = HttpClientUtils.getJsonForParam(statusUrl, new HashMap<>());
if (StringUtils.isEmpty(respData)) {
throw new RuntimeException("执行失败!");
throw new RuntimeException("get status failed!");
}
logger.info("#################响应结果{}" + respData);
Map map = JSONObject.parseObject(respData);
if (!map.containsKey("connector")) {
throw new RuntimeException("执行失败!");
}
/*List<Map> tasks = (List<Map>) map.get("tasks");
if (tasks.size() > 0 && null != tasks) {
String tasksStatus = (String) tasks.get(0).get("state");
logger.info("response tasks status{}" + tasksStatus);
return tasksStatus;
}*/
Map connector = (Map) map.get("connector");
if (!connector.containsKey("state")) {
throw new RuntimeException("get status failed!");
}
String status = (String) connector.get("state");
if (StringUtils.isEmpty(status))
throw new RuntimeException("执行失败!");
logger.info("response connector status{}" + status);
if (StringUtils.isNotEmpty((String) params.get("id"))) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
getKafkaTaskStatus(statusUrl, params);
}
});
thread.start();
}
return status;
}
/*
* kafka rest api 获取任务状态
* */
public String getKafkaTaskStatus(String statusUrl, Map params) {
String respData = HttpClientUtils.getJsonForParam(statusUrl, null);
if (StringUtils.isEmpty(respData)) {
throw new RuntimeException("get status failed!");
}
Map map = JSONObject.parseObject(respData);
List<Map> tasks = (List<Map>) map.get("tasks");
if (tasks.size() == 0) {
String s = getKafkaTaskStatus(statusUrl, params);
if(StringUtils.isNotEmpty(s)){
return s;
}
}
String status = (String) tasks.get(0).get("state");
logger.info("response tasks status{}" + status);
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
saveBody.setStatus(status);
saveBody.setId(Integer.valueOf(params.get("id").toString()));
dmpRealtimeSyncInfoDao.update(saveBody);
return status;
}
}
\ No newline at end of file
# 测试环境配置
server:
port: 7181
port: 7183
#contextPath: /resource
management:
port: 54001
port: 54003
health:
mail:
enabled: false
......
......@@ -182,14 +182,12 @@
execution_flows
where 1=1
<if test="businessTime != null and businessTime != ''"> and from_unixtime(submit_time/1000,'%Y-%m-%d') =#{businessTime} </if>
<if test="taskName != null and taskName != ''">
and flow_id in
<foreach collection="taskName" item="item" open="(" separator="," close=")">
#{item}
</foreach>
</if>
<if test="startTime != null and startTime != ''"> and from_unixtime(submit_time/1000,'%Y-%m-%d %H:%i:%s') >=#{startTime} </if>
<if test="endTime != null and endTime != ''">#{endTime} >= and from_unixtime(submit_time/1000,'%Y-%m-%d %H:%i:%s')</if>
<if test="endTime != null and endTime != ''">and #{endTime} >= from_unixtime(submit_time/1000,'%Y-%m-%d %H:%i:%s')</if>
</select>
<!-- <resultMap type="java.util.HashMap" id="taskExamplesLog">
......@@ -209,5 +207,22 @@
from execution_logs
where 1=1 and exec_id=#{execId}
</select>
<!--统计实例状态数-->
<select id="queryTaskInstanceStatus" resultType="java.util.Map">
select
t.*,
sum(t.running+t.success +t.failed) manualNum
from(
SELECT
sum(case when status='30' then 1 else 0 end) running,
sum(case when status='50' then 1 else 0 end) success,
sum(case when status='60' then 1 else 0 end) killed,
sum(case when status='70' then 1 else 0 end) failed
FROM execution_flows
)t
</select>
<select id="queryLastStatus" resultType="java.util.Map">
</select>
</mapper>
\ No newline at end of file
......@@ -175,9 +175,9 @@
t3.real_name as userName
from
dmp_navigation_tree t1
left join dmp_develop_task t2 on t2.TREE_ID=t1.ID and t2.data_status ='1'
inner join dmp_develop_task t2 on t2.TREE_ID=t1.ID and t2.data_status ='1'
left join dmp_member t3 on t1.create_user_id=t3.user_id
where 1=1 and t1.type='01'
where 1=1 and IS_LEVEL ='1'
and t1.project_id = #{projectId}
<if test="taskId != null and taskId != ''"> and t2.id =#{taskId} </if>
<if test="treeIdOrName != null and treeIdOrName != ''"> and t1.name like concat('%',#{treeIdOrName},'%') </if>
......@@ -218,9 +218,9 @@
(case when t1.type='01' then '离线同步' when t1.type='02' then '实时同步' when t1.type='03' then '数据开发' end) as type
from
dmp_navigation_tree t1
left join dmp_develop_task t2 on t2.TREE_ID=t1.ID and t2.data_status ='1'
inner join dmp_develop_task t2 on t2.TREE_ID=t1.ID and t2.data_status ='1'
left join dmp_member t3 on t1.create_user_id=t3.user_id
where 1=1 and t1.type='01'
where 1=1 and is_level = '1' and t1.data_status ='1'
and t1.project_id = #{projectId}
<if test="treeId != null and treeId != ''"> and t1.id =#{treeId} </if>
<if test="taskName != null and taskName != ''"> and t1.name like concat('%',#{taskName},'%') </if>
......
......@@ -546,4 +546,17 @@
#{item}
</foreach>
</update>
<!--实时任务状态-->
<select id="queryTaskStatus" resultType="java.util.Map" parameterType="string">
select
sum(case when t1.status='FAILED' then 1 else 0 end) failed,
sum(case when t1.status='RUNNING' then 1 else 0 end) running,
sum(case when t1.status='PAUSED' then 1 else 0 end) paused
from
dmp_realtime_sync_info t1
inner join dmp_navigation_tree t2 on t1.tree_id=t2.id
where 1=1 and t1.data_status='1' and t2.data_status='1'
and t1.project_id =#{projectId}
</select>
</mapper>
\ No newline at end of file
<#if apiKey??>
{
"apiKey":"${apiKey!}",
"method":"request",
"signType":"MD5",
"sign":"${sign!}",
"timestamp":"${timestamp!}",
"params":{
"authCode":"",
"isTest":true,
"reqParams":{
"datasourceId": "1",
"query_database": "product",
"query_table": "table1",
"request_fileds": {"flelds1": "xxxxxx", "flelds2":"xxxxxx"},
"response_fields": "field1,field2,field3,field4",
"page_num": 1,
"page_size": 100
}
}
}
</#if>
\ No newline at end of file
{
"projectId":"35",
"treeId":755,
"taskId":"212",
"srcDataSourceId":"239",
"srcDatasourceTypeId":1,
"targetDataSourceId":"202",
"targetDataSourceTypeId":10,
"regularExpression":"21312312",
"tables":[
{
"sourceTableName":"dmp_table",
"targetTableName":"dmp_table",
"fieldInfo":[
{
"desensitizationField":"is_show",
"arithmetic":"HmacSHA256"
},
{
"desensitizationField":"cn_name",
"arithmetic":"HmacSHA256"
}
]
},
{
"sourceTableName":"dmp_table_column",
"targetTableName":"dmp_table_column",
"fieldInfo":[
]
},
{
"sourceTableName":"dmp_table_field_mapping",
"targetTableName":"dmp_table_field_mapping",
"fieldInfo":[
]
},
{
"sourceTableName":"dmp_table_field_schema",
"targetTableName":"dmp_table_field_schema",
"fieldInfo":[
]
}
]
}
/*{
"projectId":31,
"treeId": 1,
"taskId":181,
......@@ -21,22 +66,4 @@
"pkName":"source_database_name,source_table_name,stat_date"
}
]
}
/*{
"projectId":31,
"treeId": 1,
"taskId":181, //任务id编辑时 需要
"srcDataSourceId":205, //来源数据源id
"targetDataSourceId":202, //目标数据源id
"tables":[
{
"sourceTableName":"dmp_data_contrast", //选择的 来源表
"targetTableName":"dmp_data_contrast", //目标表
"desensitizationField":"target_database_name", //脱敏字段
"arithmetic":"HmacSHA256" //算法
}
]
}*/
\ No newline at end of file
......@@ -82,8 +82,6 @@
{
"params": {
//"version": "1.0", //版本
"treeId": 669,
"projectId": "31",
"taskId":"", //任务id
......@@ -103,7 +101,6 @@
"driverMemory":"",//分配任务内存
"executorCore":"1", //单executor的cpu数
"driverCore":"1"//单executor的cpu数
//"fieldMapping":""//字段映射关系
},
"reader": {
"sourceDbId": "",
......@@ -111,14 +108,14 @@
"registerTableName": "dmp_azkaban_exector_server_config",
"sourceHdfsPath": "", //HDFS存储目录
"sourceHdfsFile": "",
"sourceFtpDir": "", //文件所在目录
"sourceFtpDir": "", //Ftp文件所在目录
"fileType": "", //文件类型
"sourceCsvDelimiter": "", //分隔符
"sourceCsvCharset": "", //字符集编码
"sourceCsvHeader": "", //是否含有表头
"null值":"",
"compressedFormat":"", //压缩格式
"sourceFtpFile": "", //文件名
"sourceFtpFile": "", //Ftp文件名
"sourceSkipFtpFile": "", //没有数据文件是否跳过
"sourceCsvQuote": "",
"sourceFtpLoadDate": "", //加载数据日期
......@@ -130,19 +127,13 @@
"column": [
{
"name": "host",
"type": "VARCHAR"
"type": "VARCHAR",
"fieldAlias" : ""
},
{
"name": "port",
"type": "VARCHAR"
},
{
"name": "user_name",
"type": "VARCHAR"
},
{
"name": "pass_word",
"type": "VARCHAR"
"type": "VARCHAR",
"fieldAlias" : ""
}
]
},
......@@ -219,7 +210,9 @@
},
"mappingRelation":[
{
"sourceFieldId":"",
"sourceField":"",
"targetFieldId":"",
"targetField":""
}
]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment