Commit 83685b10 authored by mcb's avatar mcb

commit

parent 16943735
......@@ -285,6 +285,12 @@
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp</groupId>
<artifactId>okhttp</artifactId>
<version>2.7.5</version>
</dependency>
</dependencies>
<build>
<finalName>jz-dmp-service</finalName>
......
......@@ -83,4 +83,10 @@ public class GatewayApiConstant {
//日志详情
public static final String logDetails = "/api/logging/getReqDetail";
//服务开发 api列表
public static final String listServerApplyApi = "/api/interface/listServerApplyApi";
//获取文件夹树
public static final String folderTree = "/api/producer/getFileCatalog";
}
......@@ -48,6 +48,9 @@ public class RestClient {
*/
@SuppressWarnings({ "unchecked" })
public static Map<String, Object> post(String url,String jsonStr){
LOGGER.info("===================post request Start=======================");
LOGGER.info("url:" + url);
LOGGER.info("json:" + jsonStr);
HttpHeaders headers = new HttpHeaders();
headers.set("Content-Type", "application/json;charset=UTF-8");//解决请求乱码问题
Map<String, Object> resutMap = null ;
......@@ -58,6 +61,7 @@ public class RestClient {
e.printStackTrace();
LOGGER.error("rest post 异常",e.getMessage(),e);
}
LOGGER.info("===================post request end=======================");
return resutMap;
}
......
......@@ -10,6 +10,7 @@ import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
......@@ -31,6 +32,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
......@@ -48,79 +50,6 @@ public class HttpClientUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientUtils.class);
/**
* HttpClient连接SSL
*/
public void ssl() {
CloseableHttpClient httpclient = null;
try {
KeyStore trustStore = KeyStore.getInstance(KeyStore
.getDefaultType());
FileInputStream instream = new FileInputStream(new File(
"d:\\tomcat.keystore"));
try {
// 加载keyStore d:\\tomcat.keystore
trustStore.load(instream, "123456".toCharArray());
} catch (CertificateException e) {
e.printStackTrace();
} finally {
try {
instream.close();
} catch (Exception ignore) {
}
}
// 相信自己的CA和所有自签名的证书
SSLContext sslcontext = SSLContexts
.custom()
.loadTrustMaterial(trustStore,
new TrustSelfSignedStrategy()).build();
// 只允许使用TLSv1协议
SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(
sslcontext,
new String[]{"TLSv1"},
null,
SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
httpclient = HttpClients.custom().setSSLSocketFactory(sslsf)
.build();
// 创建http请求(get方式)
HttpGet httpget = new HttpGet(
"https://localhost:8443/myDemo/Ajax/serivceJ.action");
System.out.println("executing request" + httpget.getRequestLine());
CloseableHttpResponse response = httpclient.execute(httpget);
try {
HttpEntity entity = response.getEntity();
System.out.println("----------------------------------------");
System.out.println(response.getStatusLine());
if (entity != null) {
System.out.println("Response content length: "
+ entity.getContentLength());
System.out.println(EntityUtils.toString(entity));
EntityUtils.consume(entity);
}
} finally {
response.close();
}
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (KeyStoreException e) {
e.printStackTrace();
} finally {
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* @param url 访问地址
* @param token 请求头中存放token
......@@ -167,60 +96,6 @@ public class HttpClientUtils {
return result;
}
/**
* post方式提交表单(模拟用户登录请求)
*/
public void postForm() {
// 创建默认的httpClient实例.
CloseableHttpClient httpclient = HttpClients.createDefault();
// 创建httppost
HttpPost httppost = new HttpPost(
"http://localhost:8080/myDemo/Ajax/serivceJ.action");
// 创建参数队列
List<NameValuePair> formparams = new ArrayList<NameValuePair>();
formparams.add(new BasicNameValuePair("username", "admin"));
formparams.add(new BasicNameValuePair("password", "123456"));
UrlEncodedFormEntity uefEntity;
try {
uefEntity = new UrlEncodedFormEntity(formparams, "UTF-8");
httppost.setEntity(uefEntity);
System.out.println("executing request " + httppost.getURI());
CloseableHttpResponse response = httpclient.execute(httppost);
try {
if (response.getStatusLine().getStatusCode() == 200) {
HttpEntity httpEntity = response.getEntity();
if (httpEntity != null) {
System.out
.println("--------------------------------------");
System.out.println("Response content: "
+ EntityUtils.toString(httpEntity, "UTF-8"));
System.out
.println("--------------------------------------");
}
} else {
httppost.abort();
}
} finally {
response.close();
}
} catch (ClientProtocolException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e1) {
e1.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭连接,释放资源
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 发送 get请求
*/
......@@ -287,6 +162,7 @@ public class HttpClientUtils {
RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(50000).setConnectTimeout(50000).build();//设置请求和传输超时时间
httpPost.setConfig(requestConfig);
StringEntity se = new StringEntity(json, "UTF-8");
//StringEntity se = new StringEntity("UTF-8");
se.setContentType("application/json");
se.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, "application/json"));
System.out.println("-------------" + JSONObject.toJSONString(se));
......@@ -312,7 +188,7 @@ public class HttpClientUtils {
* @throws IOException
* @throws ClientProtocolException
*/
public static String postFormUrlencoded(String url,String json) {
public static String postFormUrlencoded(String url, String json) {
LOGGER.info("===================POST request start=======================");
LOGGER.info("url:" + url);
LOGGER.info("json:" + json);
......@@ -329,7 +205,7 @@ public class HttpClientUtils {
se.setContentType("application/x-www-form-urlencoded");
se.setContentEncoding(new BasicHeader(HTTP.CONTENT_TYPE, "application/x-www-form-urlencoded"));
System.out.println("-------------" + JSONObject.toJSONString(se));httpPost.setEntity(se);*/
httpPost.setEntity(new StringEntity(json,"UTF-8"));
httpPost.setEntity(new StringEntity(json, "UTF-8"));
httpPost.setHeader("Content-type", "application/x-www-form-urlencoded");
HttpResponse response = httpClient.execute(httpPost);
if (response != null) {
......@@ -387,7 +263,6 @@ public class HttpClientUtils {
return result;
}
/**
* 提交JSON参数
*
......@@ -502,7 +377,6 @@ public class HttpClientUtils {
return result;
}
/**
* 发送 post请求访问本地应用并根据传递参数不同返回不同结果
*
......@@ -537,7 +411,6 @@ public class HttpClientUtils {
return result;
}
/**
* get请求,返回字节数组
*
......@@ -600,7 +473,6 @@ public class HttpClientUtils {
return responseContent;
}
/**
* @param url 访问地址
* @param token 请求头中存放token
......@@ -669,7 +541,7 @@ public class HttpClientUtils {
StringBuilder sb = new StringBuilder();
String fh = "";
if (paramMap != null) {
if (paramMap != null && paramMap.size() > 0) {
if (url.indexOf("?") == -1) {
fh = "?";
}
......@@ -716,7 +588,6 @@ public class HttpClientUtils {
}
public static void main(String[] args) throws Exception {
// 私钥
/*String privateKey = "MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAJzAfk55exxmvy3+pXsJINZEtmwUp6eBfDSl2YnT5bdJL3nzPnSjmaWsf8x9hR4QyGnvlV/Vo0sk36X7ATcqxfIn0+6W5f8IR4XtVDhxZsD/cK8nVThqFGQagmyNAwxP/wBnAXOy+fpwZrMOgqfosYmVsmImFWbHA87C4mx0bwoJAgMBAAECgYB4tlBOVIT3ITTW0cRT1HrCJxYoc1uMxk2FKbc1ycWceTKjgiu1nQtEp2ufaYYq2hfMZOEudRIUWygT5RFRj5HxLfL6Me3y6dtgyHvOVeMDNGAG+tsn8ObQCQjZ/hVKzFFgHlrHv5i4zX44im2IdvLqnV6cEUneduJfZAQT/XTGUQJBANAZbuTqlDRj/9ObGZEvaPe95FGAPNFEiNmRvLsCsRmruJA5h2ogwx8O4Yll5LylKV6C33Vws4pgAPBHvOzGTx0CQQDA1VYU4ihNyvMknFIAmMT2ojQmQ8ASX64hVFWY9ehf5JaJ+ZD1c1BvbrmubpIo7pPci50BmXnjq6EcxVML/VbdAkAwEH/FjczXYPV4yY0ZNIsZFZoDnQvvBdZZ8khWJWQEWt5RKYh2YcTPip9bHda8H6Wzd6TnOjWt00jENr2TLqadAkEAq0TQD/xOj8mR6xJsQtttFSE78EB8d9VDc5bT7+d5XLJKgoGGnnqtFkvh32uVpYVBDsFx0dneyLfHgSZBfISmgQJADZ71qrfCDvuoYNS4aOW52OL6LMC84Qi2EnZzl5OHkUuOv5jwBoOCRLEn0N999EMP6DBg8P5kg1llTq7bMG11ug==";
......@@ -731,4 +602,188 @@ public class HttpClientUtils {
}
/**
* HttpClient连接SSL
*/
public void ssl() {
CloseableHttpClient httpclient = null;
try {
KeyStore trustStore = KeyStore.getInstance(KeyStore
.getDefaultType());
FileInputStream instream = new FileInputStream(new File(
"d:\\tomcat.keystore"));
try {
// 加载keyStore d:\\tomcat.keystore
trustStore.load(instream, "123456".toCharArray());
} catch (CertificateException e) {
e.printStackTrace();
} finally {
try {
instream.close();
} catch (Exception ignore) {
}
}
// 相信自己的CA和所有自签名的证书
SSLContext sslcontext = SSLContexts
.custom()
.loadTrustMaterial(trustStore,
new TrustSelfSignedStrategy()).build();
// 只允许使用TLSv1协议
SSLConnectionSocketFactory sslsf = new SSLConnectionSocketFactory(
sslcontext,
new String[]{"TLSv1"},
null,
SSLConnectionSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
httpclient = HttpClients.custom().setSSLSocketFactory(sslsf)
.build();
// 创建http请求(get方式)
HttpGet httpget = new HttpGet(
"https://localhost:8443/myDemo/Ajax/serivceJ.action");
System.out.println("executing request" + httpget.getRequestLine());
CloseableHttpResponse response = httpclient.execute(httpget);
try {
HttpEntity entity = response.getEntity();
System.out.println("----------------------------------------");
System.out.println(response.getStatusLine());
if (entity != null) {
System.out.println("Response content length: "
+ entity.getContentLength());
System.out.println(EntityUtils.toString(entity));
EntityUtils.consume(entity);
}
} finally {
response.close();
}
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (KeyStoreException e) {
e.printStackTrace();
} finally {
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* post方式提交表单(模拟用户登录请求)
*/
public void postForm() {
// 创建默认的httpClient实例.
CloseableHttpClient httpclient = HttpClients.createDefault();
// 创建httppost
HttpPost httppost = new HttpPost(
"http://localhost:8080/myDemo/Ajax/serivceJ.action");
// 创建参数队列
List<NameValuePair> formparams = new ArrayList<NameValuePair>();
formparams.add(new BasicNameValuePair("username", "admin"));
formparams.add(new BasicNameValuePair("password", "123456"));
UrlEncodedFormEntity uefEntity;
try {
uefEntity = new UrlEncodedFormEntity(formparams, "UTF-8");
httppost.setEntity(uefEntity);
System.out.println("executing request " + httppost.getURI());
CloseableHttpResponse response = httpclient.execute(httppost);
try {
if (response.getStatusLine().getStatusCode() == 200) {
HttpEntity httpEntity = response.getEntity();
if (httpEntity != null) {
System.out
.println("--------------------------------------");
System.out.println("Response content: "
+ EntityUtils.toString(httpEntity, "UTF-8"));
System.out
.println("--------------------------------------");
}
} else {
httppost.abort();
}
} finally {
response.close();
}
} catch (ClientProtocolException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e1) {
e1.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭连接,释放资源
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void httpDelete(String url) {
LOGGER.info("===================Delete request start=======================");
LOGGER.info("url:" + url);
// 获得Http客户端(可以理解为:你得先有一个浏览器;注意:实际上HttpClient与浏览器是不一样的)
CloseableHttpClient httpClient = HttpClients.createDefault();
// 创建Delete请求
HttpDelete httpDelete = new HttpDelete(url);
// 响应模型
CloseableHttpResponse response = null;
try {
// 配置信息
RequestConfig requestConfig = RequestConfig.custom()
// 设置连接超时时间(单位毫秒)
.setConnectTimeout(5000)
// 设置请求超时时间(单位毫秒)
.setConnectionRequestTimeout(5000)
// socket读写超时时间(单位毫秒)
.setSocketTimeout(5000)
// 设置是否允许重定向(默认为true)
.setRedirectsEnabled(true).build();
// 将上面的配置信息 运用到这个Delete请求里
httpDelete.setConfig(requestConfig);
// 由客户端执行(发送)Delete请求
response = httpClient.execute(httpDelete);
// 从响应模型中获取响应实体
HttpEntity responseEntity = response.getEntity();
System.out.println("响应状态为:" + response.getStatusLine());
if (responseEntity != null) {
System.out.println("响应内容长度为:" + responseEntity.getContentLength());
//主动设置编码,防止相应出现乱码
System.out.println("响应内容为:" + EntityUtils.toString(responseEntity, StandardCharsets.UTF_8));
}
} catch (ClientProtocolException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
// 释放资源
if (httpClient != null) {
httpClient.close();
}
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
LOGGER.info("===================Delete request end=======================");
}
}
package com.jz.common.utils.web;
import com.squareup.okhttp.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* @ClassName: OKHttpUtil
* @Description: OKHttpUtil
* @Author Bellamy
* @Date 2021/2/25
* @Version 1.0
*/
public class OKHttpUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientUtils.class);
/**
* get请求
*
* @param url
* @return
*/
public static String httpGet(String url) {
String result = null;
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
try {
Response response = client.newCall(request).execute();
result = response.body().string();
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* post请求
*
* @param url
* @param data 提交的参数为key=value&key1=value1的形式
*/
public static String httpPost(String url, String data) {
String result = null;
OkHttpClient httpClient = new OkHttpClient();
RequestBody requestBody = RequestBody.create(MediaType.parse("text/html;charset=utf-8"), data);
Request request = new Request.Builder().url(url).post(requestBody).build();
try {
Response response = httpClient.newCall(request).execute();
result = response.body().string();
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
public static String httpPut(String url, String stringJson) {
LOGGER.info("===================put request start=======================");
LOGGER.info("url:" + url);
LOGGER.info("json:" + stringJson);
OkHttpClient client = new OkHttpClient();
MediaType mediaType = MediaType.parse("application/json; charset=utf-8");
RequestBody body = RequestBody.create(mediaType, stringJson);
Request request = new Request.Builder()
.url(url)
.put(body)
.addHeader("Content-Type", "application/json")
//.addHeader("Connection", "keep-alive")
.build();
try {
client.newCall(request).execute();
/*Response response = client.newCall(request).execute();
String str = response.body().string();*/
} catch (IOException e) {
e.printStackTrace();
}
LOGGER.info("===================put request end=======================");
return null;
}
}
package com.jz.dmp.modules.controller.DataIntegration;
import com.alibaba.fastjson.JSONObject;
import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
......@@ -75,29 +76,30 @@ public class RealTimeSyncController {
* @author Bellamy
* @since 2021-01-05
*/
@ApiOperation(value = "批量启动运行实时同步任务", notes = "批量启动实时同步任务")
@ApiOperation(value = "批量启动/停止运行实时同步任务", notes = "批量启动实时同步任务")
@GetMapping(value = "/startRealTimeSync")
@ApiImplicitParam(name = "realTaskId", value = "任务id")
public JsonResult startRealTimeSync(@RequestParam String realTaskId) throws Exception {
if (StringUtils.isEmpty(realTaskId)) {
@ApiImplicitParams({@ApiImplicitParam(name = "taskId", value = "任务id", required = true),
@ApiImplicitParam(name = "type", value = "01:运行,02:停止", required = true),
@ApiImplicitParam(name = "projectId", value = "项目id", required = true)})
public JsonResult startRealTimeSync(@RequestParam String taskId, @RequestParam String projectId, @RequestParam String type) throws Exception {
if (StringUtils.isEmpty(taskId)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "任务id不能为空!");
}
String[] ids = realTaskId.split(",");
List<DmpRealtimeSyncInfo> list = dmpRealtimeSyncInfoService.queryListById(ids);
if (list.size() > 0 && list != null) {
for (int i = 0; i < list.size(); i++) {
DmpRealtimeSyncInfo dmpRealtimeSyncInfo = list.get(i);
String srcTopicName = dmpRealtimeSyncInfo.getSrcTopicName();
System.out.println(srcTopicName);
logger.info("############正常执行表数据id{}........" + ids[i]);
String shellPath = "/app/bigdata-app/scripts/trigger_straming.sh";
boolean flag = CmdUtils.callShell(shellPath, srcTopicName);
if(flag){
logger.info("############执行成功{}" + flag);
if (StringUtils.isEmpty(projectId)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "projectId不能为空!");
}
if (StringUtils.isEmpty(type)) {
return new JsonResult(ResultCode.PARAMS_ERROR, "type不能为空!");
}
JsonResult result = new JsonResult();
try {
result = dmpRealtimeSyncInfoService.executeRealtimeTask(taskId, type, projectId);
} catch (Exception e) {
e.printStackTrace();
result.setMessage("执行失败!");
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
return new JsonResult();
return result;
}
/**
......@@ -204,13 +206,13 @@ public class RealTimeSyncController {
}
/**
* 保存实时同步任务
* 保存/编辑实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
@ApiOperation(value = "保存实时同步任务", notes = "保存实时同步任务")
@ApiOperation(value = "保存/编辑实时同步任务", notes = "保存实时同步任务")
@PostMapping(value = "/addTask")
public JsonResult addTask(@RequestBody Map<String, Object> params, HttpServletRequest httpRequest) throws Exception {
logger.info("###################请求参数{}" + params.toString() + "############");
......@@ -224,7 +226,7 @@ public class RealTimeSyncController {
return new JsonResult(ResultCode.PARAMS_ERROR, "目标数据源id不能为空!");
}
if (StringUtils.isEmpty(params.get("treeId").toString())) {
return new JsonResult(ResultCode.PARAMS_ERROR, "业务节点id不能为空!");
return new JsonResult(ResultCode.PARAMS_ERROR, "treeId不能为空!");
}
JsonResult result = new JsonResult();
......@@ -245,7 +247,7 @@ public class RealTimeSyncController {
* @author Bellamy
* @since 2021-01-08
*/
@ApiOperation(value = "编辑实时同步任务", notes = "编辑实时同步任务")
/* @ApiOperation(value = "编辑实时同步任务", notes = "编辑实时同步任务")
@PostMapping(value = "/updateTask")
public JsonResult updateTask(@RequestBody Map<String, Object> params, HttpServletRequest httpRequest) throws Exception {
logger.info("################请求参数{}" + params.toString() + "############");
......@@ -262,20 +264,16 @@ public class RealTimeSyncController {
return new JsonResult(ResultCode.PARAMS_ERROR, "任务id不能为空!");
}
//异步提交
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
JsonResult jsonResult = new JsonResult();
try {
dmpRealtimeSyncInfoService.updateRealTimeTask(params);
jsonResult = dmpRealtimeSyncInfoService.updateRealTimeTask(params);
} catch (Exception e) {
e.printStackTrace();
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
}
});
thread.start();
return new JsonResult();
}
return jsonResult;
}*/
/**
* 编辑--数据回显,通过id查询任务相关信息
......
......@@ -183,6 +183,9 @@ public class DataSourceListDto {
@ApiModelProperty(value = "accessKey")
private String accessKey;
@ApiModelProperty(value = "密码")
private String password;
public Long getId() {
return id;
}
......@@ -422,4 +425,12 @@ public class DataSourceListDto {
public void setDatasourceTypeId(String datasourceTypeId) {
this.datasourceTypeId = datasourceTypeId;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
......@@ -6,6 +6,7 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;
import java.util.Map;
/**
* @ClassName: RealTimeEditDataEchoDto
......@@ -23,41 +24,35 @@ public class RealTimeEditDataEchoDto {
@ApiModelProperty(value = "实时同步任务ID")
private String id;
@ApiModelProperty(value = "项目id")
private String projectId;
@ApiModelProperty(value = "treeId")
private String treeId;
/*
* 来源数据源id
* */
@ApiModelProperty(value = "来源数据源id")
private String srcDatasourceId;
private String srcDataSourceId;
/*
* 来源数据源名称
* */
@ApiModelProperty(value = "来源数据源")
private String srcDatasourceName;
@ApiModelProperty(value = "来源数据源类型id")
private String srcDatasourceTypeId;
/*
* 目标数据源id
* */
@ApiModelProperty(value = "目标数据源id")
private String targetDatasourceId;
private String targetDataSourceId;
/*
* 目标数据源名称
* */
@ApiModelProperty(value = "去向数据源")
private String targetDatasourceName;
@ApiModelProperty(value = "目标数据源类型id")
private String targetDataSourceTypeId;
/*
* 黑名单表
* */
@ApiModelProperty(value = "黑名单表")
private String blacklistTable;
@ApiModelProperty(value = "正则表达式")
private String regularExpression;
/*
* 已选择的表
* */
@ApiModelProperty(value = "已选择的表")
private List<DmpRealtimeSyncSelectTable> selectTable;
private List<Map> tables;
public String getId() {
return id;
......@@ -67,51 +62,67 @@ public class RealTimeEditDataEchoDto {
this.id = id;
}
public String getSrcDatasourceId() {
return srcDatasourceId;
public String getSrcDataSourceId() {
return srcDataSourceId;
}
public void setSrcDataSourceId(String srcDataSourceId) {
this.srcDataSourceId = srcDataSourceId;
}
public String getTargetDataSourceId() {
return targetDataSourceId;
}
public void setTargetDataSourceId(String targetDataSourceId) {
this.targetDataSourceId = targetDataSourceId;
}
public String getProjectId() {
return projectId;
}
public void setSrcDatasourceId(String srcDatasourceId) {
this.srcDatasourceId = srcDatasourceId;
public void setProjectId(String projectId) {
this.projectId = projectId;
}
public String getSrcDatasourceName() {
return srcDatasourceName;
public String getTreeId() {
return treeId;
}
public void setSrcDatasourceName(String srcDatasourceName) {
this.srcDatasourceName = srcDatasourceName;
public void setTreeId(String treeId) {
this.treeId = treeId;
}
public String getTargetDatasourceId() {
return targetDatasourceId;
public String getSrcDatasourceTypeId() {
return srcDatasourceTypeId;
}
public void setTargetDatasourceId(String targetDatasourceId) {
this.targetDatasourceId = targetDatasourceId;
public void setSrcDatasourceTypeId(String srcDatasourceTypeId) {
this.srcDatasourceTypeId = srcDatasourceTypeId;
}
public String getTargetDatasourceName() {
return targetDatasourceName;
public String getTargetDataSourceTypeId() {
return targetDataSourceTypeId;
}
public void setTargetDatasourceName(String targetDatasourceName) {
this.targetDatasourceName = targetDatasourceName;
public void setTargetDataSourceTypeId(String targetDataSourceTypeId) {
this.targetDataSourceTypeId = targetDataSourceTypeId;
}
public String getBlacklistTable() {
return blacklistTable;
public String getRegularExpression() {
return regularExpression;
}
public void setBlacklistTable(String blacklistTable) {
this.blacklistTable = blacklistTable;
public void setRegularExpression(String regularExpression) {
this.regularExpression = regularExpression;
}
public List<DmpRealtimeSyncSelectTable> getSelectTable() {
return selectTable;
public List<Map> getTables() {
return tables;
}
public void setSelectTable(List<DmpRealtimeSyncSelectTable> selectTable) {
this.selectTable = selectTable;
public void setTables(List<Map> tables) {
this.tables = tables;
}
}
......@@ -83,6 +83,12 @@ public class RealTimeSyncListDto {
@ApiModelProperty(value = "上下线状态:Y 上线,N 下线")
private String onlineStatus;
@ApiModelProperty(value = "数据量")
private String dataSize;
@ApiModelProperty(value = "任务描述")
private String taskDesc;
public String getId() {
return id;
}
......@@ -178,4 +184,20 @@ public class RealTimeSyncListDto {
public void setOnlineStatus(String onlineStatus) {
this.onlineStatus = onlineStatus;
}
public String getDataSize() {
return dataSize;
}
public void setDataSize(String dataSize) {
this.dataSize = dataSize;
}
public String getTaskDesc() {
return taskDesc;
}
public void setTaskDesc(String taskDesc) {
this.taskDesc = taskDesc;
}
}
......@@ -58,13 +58,13 @@ public class RealTimeSyncListReq extends BasePageBean {
/*
* 节点id
* */
@ApiModelProperty(value = "节点名称或id")
@ApiModelProperty(value = "任务名称称或id")
private String treeId;
/*
* 节点id
* 任务名称
* */
@ApiModelProperty(value = "节点名称")
@ApiModelProperty(value = "任务名称")
private String treeName;
public String getProjectId() {
......
......@@ -245,6 +245,48 @@ public class DmpApiServiceMangeController {
return jsonResult;
}
/**
* 服务开发API列表
*
* @author Bellamy
* @since 2021-02-24
*/
@ApiOperation(value = "服务开发--API列表", notes = "服务开发API列表")
@PostMapping(value = "/apiListPaging")
public JsonResult apiListPaging(@RequestBody @Validated ApiInterfaceInfoListReq req, HttpServletRequest httpRequest) {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = dmpApiServiceMangeService.apiListPaging(req);
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return jsonResult;
}
/**
* 获取文件夹列表
*
* @return
* @author Bellamy
*/
@ApiOperation(value = "获取文件夹列表", notes = "获取文件夹列表")
@GetMapping(value = "/folderTree")
@ApiImplicitParams({@ApiImplicitParam(name = "projectId", value = "项目id"),
@ApiImplicitParam(name = "orgCode", value = "组织编码")})
public JsonResult getFolderTree(@RequestParam(name = "projectId",required = false) String projectId,@RequestParam(name = "orgCode",required = false) String orgCode) {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = dmpApiServiceMangeService.getFolderTree(projectId,orgCode);
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
e.printStackTrace();
}
return jsonResult;
}
/**
* 获取数据源表字段
*
......@@ -252,12 +294,12 @@ public class DmpApiServiceMangeController {
* @author Bellamy
* @since 2021-01-21
*/
/* @ApiOperation(value = "获取数据源表字段", notes = "获取数据源表字段")
/* @ApiOperation(value = "获取数据源表字段111", notes = "获取数据源表字段")
@PostMapping(value = "/getTableColumns")
public JsonResult getTableColumns(@RequestBody @Validated SoureTableColumnsReq req) throws Exception {
public JsonResult getTableColumns() throws Exception {
JsonResult jsonResult = new JsonResult();
try {
jsonResult = offlineSynchService.querySoureTableColumns(req);
jsonResult = dmpApiServiceMangeService.test();
} catch (Exception e) {
jsonResult.setMessage(e.getMessage());
jsonResult.setCode(ResultCode.INTERNAL_SERVER_ERROR);
......
......@@ -22,4 +22,16 @@ public class ApiInterfaceInfoListReq extends BasePageBean implements Serializabl
private String status;
@ApiModelProperty(value = "ApiKey")
private String apiKey;
@ApiModelProperty(value = "api名称")
private String apiName;
@ApiModelProperty(value = "组织名称")
private String orgName;
// 服务开发----------------API列表
@ApiModelProperty(value = "项目id--服务开发")
private Long projectId;
@ApiModelProperty(value = "文件id--服务开发")
private Long fileId;
}
......@@ -56,7 +56,7 @@ public class DmpRealtimeSyncInfo implements Serializable {
* 项目id
*/
@ApiModelProperty(value = "项目id")
private Object projectId;
private String projectId;
@ApiModelProperty(value = "${column.comment}")
private Integer parentId;
......@@ -238,11 +238,11 @@ public class DmpRealtimeSyncInfo implements Serializable {
this.srcTopicName = srcTopicName;
}
public Object getProjectId() {
public String getProjectId() {
return projectId;
}
public void setProjectId(Object projectId) {
public void setProjectId(String projectId) {
this.projectId = projectId;
}
......
......@@ -80,4 +80,22 @@ public interface DmpApiServiceMangeService {
* @since 2021-01-22
*/
JsonResult queryApiNotCalledListPage(ApiInterfaceInfoListReq req) throws Exception;
/**
* 服务开发API列表
*
* @author Bellamy
* @since 2021-02-24
*/
JsonResult apiListPaging(ApiInterfaceInfoListReq req) throws Exception;
/**
* 获取文件夹列表
*
* @return
* @author Bellamy
* @since 2021-02-24
*/
JsonResult getFolderTree(String projectId, String orgCode) throws Exception;
}
\ No newline at end of file
......@@ -153,4 +153,13 @@ public interface DmpRealtimeSyncInfoService {
* @since 2021-02-22
*/
JsonResult conflictCheck(ConflictCheckReq params) throws Exception;
/**
* 启动/停止运行实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-05
*/
JsonResult executeRealtimeTask(String realTaskId, String type, String projectId) throws Exception;
}
\ No newline at end of file
......@@ -281,4 +281,74 @@ public class DmpApiServiceMangeServiceImpl implements DmpApiServiceMangeService
}
return result;
}
/**
* 服务开发API列表
*
* @param req
* @author Bellamy
* @since 2021-02-24
*/
@Override
public JsonResult apiListPaging(ApiInterfaceInfoListReq req) throws Exception {
JsonResult result = new JsonResult();
String url = gatewayUrl + GatewayApiConstant.listServerApplyApi;
String resultData = HttpClientUtils.post(url, JSONObject.toJSONString(req));
if (StringUtils.isEmpty(resultData)) {
throw new RuntimeException("查询失败!");
}
logger.info("#################响应结果数据{}" + resultData);
Map jsonObject = JSONObject.parseObject(resultData);
if (jsonObject.containsKey("code")) {
if ("200".equals(jsonObject.get("code").toString())) {
return JsonResult.ok(jsonObject.get("data"));
}
}
if (jsonObject.containsKey("message")) {
logger.info(jsonObject.get("message").toString());
result.setMessage(jsonObject.get("message").toString());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
return result;
}
/**
* 获取文件夹列表
*
* @param projectId
* @param orgCode
* @return
* @author Bellamy
* @since 2021-02-24
*/
@Override
public JsonResult getFolderTree(String projectId, String orgCode) throws Exception {
JsonResult result = new JsonResult();
String url = gatewayUrl + GatewayApiConstant.folderTree;
Map params = new HashMap();
if(StringUtils.isNotEmpty(projectId)){
params.put("projectId", projectId);
}
params.put("orgCode", orgCode);
String returnData = HttpClientUtils.getJsonForParam(url, params);
if (StringUtils.isEmpty(returnData)) {
throw new RuntimeException("查询失败!");
}
logger.info("#################响应结果{}" + returnData);
Map map = JSONObject.parseObject(returnData);
if (map.containsKey("code")) {
if ("200".equals(map.get("code").toString())) {
return JsonResult.ok(map.get("data"));
}
}
if (map.containsKey("message")) {
logger.info(map.get("message").toString());
result.setMessage(map.get("message").toString());
result.setCode(ResultCode.INTERNAL_SERVER_ERROR);
}
return result;
}
}
\ No newline at end of file
......@@ -11,19 +11,24 @@ import com.jz.common.enums.DelFlagEnum;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService;
import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.realTime.CmdUtils;
import com.jz.common.utils.realTime.DBUtil;
import com.jz.common.utils.realTime.RestClient;
import com.jz.common.utils.web.HttpClientUtils;
import com.jz.common.utils.web.OKHttpUtil;
import com.jz.common.utils.web.SessionUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.controller.DataIntegration.bean.*;
import com.jz.dmp.modules.dao.*;
import com.jz.dmp.modules.model.*;
import com.jz.dmp.modules.service.DmpRealtimeSyncInfoService;
import com.squareup.okhttp.OkHttpClient;
import freemarker.core.ParseException;
import freemarker.template.MalformedTemplateNameException;
import freemarker.template.Template;
import freemarker.template.TemplateException;
import freemarker.template.TemplateNotFoundException;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -38,6 +43,7 @@ import org.springframework.web.servlet.view.freemarker.FreeMarkerConfig;
import java.io.IOException;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.util.*;
import java.util.regex.Pattern;
......@@ -152,7 +158,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
if (StringUtils.isNotEmpty(req.getTargetDatabaseName())) { //目标源数据源名称
req.setTargetDatabaseName(req.getTargetDatabaseName().trim());
}
if (StringUtils.isNotEmpty(req.getTreeId())) {
req.setTreeName(req.getTreeName().trim());//任务名称
/*if (StringUtils.isNotEmpty(req.getTreeId())) {
//判断是否为整数 是整数返回true,否则返回false
Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$");
if (pattern.matcher(req.getTreeId().trim()).matches()) {
......@@ -161,7 +168,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
req.setTreeName(req.getTreeId().trim());//节点名称
req.setTreeId("");
}
}
}*/
PageHelper.startPage(req.getPageNum(), req.getPageSize());
List<RealTimeSyncListDto> list = dmpRealtimeSyncInfoDao.queryRealTimeSyncListPage(req);
......@@ -319,12 +326,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
String kafkaConnectUrl = dmpProjectSystemInfo.getKafkaConnectorUrl(); //kafka 连接信息
String[] arr = kafkaConnectUrl.split(",");
//connect1@http://172.18.104.130:9993/connectors
/* if (StringUtils.isEmpty(connectorUrl))
return JsonResult.error(ResultCode.PARAMS_ERROR, "connectorUrl不能为空!");
if (connectorUrl.contains("@")) {
connectorUrl = connectorUrl.split("@")[1];
}*/
connectorUrl = arr[0];
if (connectorUrl.contains("@")) {
connectorUrl = connectorUrl.split("@")[1];
......@@ -336,7 +337,9 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
//处理已选择的表信息
//submitNoSelectTable(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
return JsonResult.ok();
Map map = new HashMap();
map.put("id", realtimeId);
return JsonResult.ok(map);
}
private Long submitDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception {
......@@ -370,8 +373,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("datasourceName", sourceDbInfo.getDatasourceName());
dataModelMap.put("dbName", sourceDbInfo.getDbName());
dataModelMap.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag")); //安全验证开关,是否启用KERBEROS
//String sourceName = (String) params.get("sourceName");
String sourceName = tree.getName();
dataModelMap.put("sourceName", sourceName);
//source kafak topic
......@@ -379,8 +380,13 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("topic", topic);
//connector_job_id 连接名称
String sourceConnectorName = "debezium-connector-" + sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName();
//查询同步任务信息表是否存在类型是数据源到数据源,srcDataSourceId,targetDataSourceId一样的信息 如果 存在就不发起请求
//source connector name
dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName());
/*
* 查询同步任务信息表是否存在类型是数据源到数据源,
* srcDataSourceId,targetDataSourceId一样的信息如果 存在就不发起请求
* */
if (StringUtils.isEmpty(String.valueOf(params.get("taskId")))) {
Map<String, Object> queryParam = new HashMap<>();
queryParam.put("srcDataSourceId", srcDataSourceId);
queryParam.put("sourceConnectorName", sourceConnectorName);
......@@ -388,16 +394,10 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
if (dataMap != null) {
throw new RuntimeException("存在相同的实时任务数据!");
}
dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数
Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
//String connectorJobId = "434343";
//请求接口正常则保存数据,否则失败
if (StringUtils.isEmpty(connectorJobId)) {
throw new RuntimeException("提交失败!");
}
//使用freemaker模板生成 kafka connector 请求参数
String jsonStr = freemakerJson("source", dataModelMap);
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
saveBody.setSrcDatasourceId(srcDataSourceId);
saveBody.setSrcDatasourceName(sourceDbInfo.getDatasourceName());
......@@ -408,25 +408,49 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody.setTargetDatabaseType(targetDbInfo.getDataSourceTypeName());
saveBody.setTargetDatabaseName(targetDbInfo.getDbName());
saveBody.setType(1);
saveBody.setStatus("SUBMIT_SUCCESS");
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
saveBody.setProjectId(projectId);
saveBody.setConnectorJsonData(jsonStr);
saveBody.setProjectId(String.valueOf(projectId));
saveBody.setConnectorUrl(connectorUrl);
saveBody.setSrcTopicName(topic);
saveBody.setSourceTypeName(sourceDbInfo.getDataSourceTypeName());
saveBody.setTargetTypeName("Kafka");
saveBody.setConnectorJobId(connectorJobId);
saveBody.setCreateTime(new Date());
saveBody.setCrePerson(SessionUtils.getCurrentUserId());
saveBody.setVersion("1.0");
saveBody.setScriptJson(JSONObject.toJSONString(params));
saveBody.setTreeId(params.get("treeId").toString());
//版本记录
DmpRealtimeTaskHistory taskHistory = new DmpRealtimeTaskHistory();
BeanUtils.copyProperties(saveBody, taskHistory);
if (StringUtils.isEmpty(String.valueOf(params.get("taskId")))) {
Map<String, String> respData = publishTask2Kafka(connectorUrl, jsonStr);
saveBody.setConnectorJobId(respData.get("connectorJobId"));
saveBody.setStatus(respData.get("status"));
dmpRealtimeSyncInfoDao.insert(saveBody);
realtiemId = Long.valueOf(saveBody.getId());
} else {
DmpRealtimeSyncInfo realtimeTask = dmpRealtimeSyncInfoDao.queryById(Integer.valueOf(params.get("taskId").toString()));
if (realtimeTask == null)
throw new RuntimeException("操作数据不存在!");
if (StringUtils.isEmpty(realtimeTask.getConnectorJobId()))
throw new RuntimeException("操作数据不存在!");
//编辑时,先删除任务,再发布任务
HttpClientUtils.httpDelete(connectorUrl + "/" + realtimeTask.getConnectorJobId() + "/");
Map<String, String> respData = publishTask2Kafka(connectorUrl, jsonStr);
saveBody.setConnectorJobId(respData.get("connectorJobId"));
saveBody.setStatus(respData.get("status"));
saveBody.setUpdateTime(new Date());
saveBody.setUptPerson(SessionUtils.getCurrentUserId());
saveBody.setId(Integer.valueOf(params.get("taskId").toString()));
dmpRealtimeSyncInfoDao.update(saveBody);
DmpRealtimeSyncInfo devTask = dmpRealtimeSyncInfoDao.queryById(saveBody.getId());
BigDecimal version = new BigDecimal(devTask.getVersion());
version = version.add(new BigDecimal(1.0));
taskHistory.setVersion(String.valueOf(version));
}
logger.info("###################保存实时同步任务--结束 ################");
DmpRealtimeTaskHistory taskHistory = new DmpRealtimeTaskHistory();
BeanUtils.copyProperties(saveBody, taskHistory);
taskHistory.setRealtimeSyncId(saveBody.getId());
dmpRealtimeSyncInfoDao.insertRealtimeHistory(taskHistory);
......@@ -438,7 +462,24 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
blacklist.put("blacklistTable", blacklistTablesInfo.get("blacklistTables").toString());
dmpRealtimeSyncInfoDao.insertRealtimeBlackList(blacklist);
logger.info("###################保存实时同步黑名单数据--结束 ################");*/
return realtiemId;
return Long.valueOf(saveBody.getId());
}
/*
* 发布到kafak 并开始运行 ,请求接口正常则保存数据,否则失败
* */
public Map<String, String> publishTask2Kafka(String connectorUrl, String jsonStr) throws Exception {
Map<String, String> returnMap = new HashMap();
Map<String, Object> result = RestClient.post(connectorUrl, jsonStr);
logger.info("=======response data {}" + JSONObject.toJSONString(result));
String connectorJobId = getConnectorJobId(result);
if (StringUtils.isEmpty(connectorJobId)) {
throw new RuntimeException("提交失败!");
}
String status = getExecuteShellStatus(connectorUrl + "/" + connectorJobId + "/status", new HashMap());
returnMap.put("status", status);
returnMap.put("connectorJobId", connectorJobId);
return returnMap;
}
/**
......@@ -657,11 +698,9 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
//根据projectId查询项目系统配置信息
DmpProjectSystemInfo dmpProjectSystemInfo = dmpProjectDao.queryProjectSystemInfo(projectId);
params.put("connectorSecurityFlag", dmpProjectSystemInfo.getKerberosIsenable()); //安全验证开关,是否启用KERBEROS
//connect1@http://172.18.104.130:9993/connectors
if (StringUtils.isEmpty(connectorUrl)) {
return JsonResult.error(ResultCode.PARAMS_ERROR, "connectorUrl不能为空!");
}
String kafkaConnectUrl = dmpProjectSystemInfo.getKafkaConnectorUrl(); //kafka 连接信息
String[] arr = kafkaConnectUrl.split(",");
connectorUrl = arr[0];
if (connectorUrl.contains("@")) {
connectorUrl = connectorUrl.split("@")[1];
}
......@@ -669,17 +708,18 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
JsonResult realtimeId = updateDatasource2DatasourceToConnector(projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
//编辑 已选择表信息
this.updateNoSelectTable(params);
//this.updateNoSelectTable(params);
return JsonResult.ok();
}
private JsonResult updateDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception {
DmpNavigationTree tree = dmpNavigationTreeDao.queryById(Integer.valueOf(params.get("treeId").toString()));
Integer srcDataSourceId = sourceDbInfo.getId(); //来源数据源id
Integer targetDataSourceId = targetDbInfo.getId(); //目标数据源id
String realtiemId = params.get("taskId").toString(); //同步任务id
//解析黑名单表
Map blacklistTablesInfo = getBlackListTableInfo(sourceDbInfo, params);
//Map blacklistTablesInfo = getBlackListTableInfo(sourceDbInfo, params);
//解析已选择表
Map selectlistTablesInfo = getSelectListTableInfo(sourceDbInfo, params);
......@@ -687,8 +727,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
Map<String, String> dataModelMap = new HashMap<>();
dataModelMap.put("kafkaBootstrapServers", dmpProjectSystemInfo.getKafkaBootstrapServers());//kafka 地址
dataModelMap.put("registerUrl", dmpProjectSystemInfo.getKafkaSchemaRegisterUrl()); //kafka connector schema 注册地址
dataModelMap.put("blacklistTables", blacklistTablesInfo.get("connectorBlacklistTables").toString()); //设置的黑名单表
dataModelMap.put("blacklistTableCount", blacklistTablesInfo.get("blacklistTableCount").toString()); //黑名单表数量
//dataModelMap.put("blacklistTables", blacklistTablesInfo.get("connectorBlacklistTables").toString()); //设置的黑名单表
//dataModelMap.put("blacklistTableCount", blacklistTablesInfo.get("blacklistTableCount").toString()); //黑名单表数量
//设置的白名单表 在模板里进行判比较黑名单表和白名单表的数量,谁小就用谁
dataModelMap.put("whitelistTablesConut", selectlistTablesInfo.get("whitelistTablesConut").toString()); //已选择表数量
dataModelMap.put("connectorWhitelistTables", selectlistTablesInfo.get("connectorWhitelistTables").toString()); //设置的已选择表
......@@ -701,8 +741,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("datasourceName", sourceDbInfo.getDatasourceName()); //数据源名称
dataModelMap.put("dbName", sourceDbInfo.getDbName()); //数据库名称
dataModelMap.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag")); //安全验证开关,是否启用KERBEROS
//前端定义的sourceConnectorName前缀
String sourceName = (String) params.get("sourceName");
String sourceName = tree.getName();
dataModelMap.put("sourceName", sourceName);
//source kafak topic
String topic = sourceDbInfo.getDatasourceName() + "_" + sourceName + "." + sourceDbInfo.getDbName() + ".databasehistory";
......@@ -710,9 +749,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数
// Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
//String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
String connectorJobId = "dfdfd";
Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
//请求接口正常则保存数据,否则失败
if (StringUtils.isNotEmpty(connectorJobId)) {
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
......@@ -726,7 +764,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody.setTargetDatabaseName(targetDbInfo.getDbName());
saveBody.setStatus("SUBMIT_SUCCESS");
saveBody.setConnectorJsonData(dataSource2DataSourceJsonStr);
saveBody.setProjectId(projectId);
saveBody.setProjectId(String.valueOf(projectId));
saveBody.setConnectorUrl(connectorUrl);
saveBody.setSrcTopicName(topic);
saveBody.setSourceTypeName(sourceDbInfo.getDataSourceTypeName());
......@@ -736,6 +774,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody.setUpdateTime(new Date());
saveBody.setId(Integer.valueOf(realtiemId));
saveBody.setUptPerson(SessionUtils.getCurrentUserId());
//saveBody.setScriptJson();
dmpRealtimeSyncInfoDao.update(saveBody);
logger.info("###################修改实时同步任务--结束 ################");
......@@ -746,14 +785,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
taskHistory.setCreateTime(new Date());
dmpRealtimeSyncInfoDao.insertRealtimeHistory(taskHistory);
Map blacklist = new HashMap();
/* Map blacklist = new HashMap();
blacklist.put("uptTime", new Date());
blacklist.put("uptPerson", SessionUtils.getCurrentUserId());
blacklist.put("realtimeId", realtiemId);
blacklist.put("datasourceId", srcDataSourceId);
blacklist.put("blacklistTable", blacklistTablesInfo.get("blacklistTables").toString());
dmpRealtimeSyncInfoDao.updateRealtimeBlackList(blacklist);
logger.info("###################修改实时同步黑名单数据--结束 ################");
logger.info("###################修改实时同步黑名单数据--结束 ################");*/
} else {
throw new RuntimeException("提交失败!");
}
......@@ -803,31 +842,22 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
List<Map> list = dmpRealtimeSyncInfoDao.selectRealtimeTaskById(taskId);
if (list.size() > 0 && list != null) {
Map realtimeMap = list.get(0);
if (StringUtils.isNotEmpty((String) realtimeMap.get("blacklistTable"))) {
returnModel.setBlacklistTable((String) realtimeMap.get("blacklistTable"));
}
returnModel.setSrcDatasourceId(realtimeMap.get("srcDatasourceId").toString());
returnModel.setSrcDatasourceName(realtimeMap.get("srcDatasourceName").toString());
returnModel.setTargetDatasourceId(realtimeMap.get("targetDatasourceId").toString());
returnModel.setTargetDatasourceName(realtimeMap.get("targetDatasourceName").toString());
Map map = (Map) JSONObject.parse((String) realtimeMap.get("scriptJson"));
returnModel.setSrcDataSourceId(String.valueOf(realtimeMap.get("srcDatasourceId")));
returnModel.setTargetDataSourceId(String.valueOf(realtimeMap.get("targetDatasourceId")));
returnModel.setTreeId((String) realtimeMap.get("treeId"));
returnModel.setId(realtimeMap.get("id").toString());
List<DmpRealtimeSyncSelectTable> selectList = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
Map str = list.get(i);
DmpRealtimeSyncSelectTable selectTable = new DmpRealtimeSyncSelectTable();
if (StringUtils.isNotEmpty((String) str.get("desensitizationField"))) {
selectTable.setDesensitizationField((String) str.get("desensitizationField")); //脱敏字段
returnModel.setRegularExpression((String) map.get("regularExpression"));
if (map.containsKey("srcDatasourceTypeId")) {
returnModel.setSrcDatasourceTypeId(String.valueOf(map.get("srcDatasourceTypeId")));
}
if (StringUtils.isNotEmpty((String) str.get("arithmetic"))) {
selectTable.setArithmetic((String) str.get("arithmetic")); //算法
if (map.containsKey("targetDataSourceTypeId")) {
returnModel.setTargetDataSourceTypeId(String.valueOf(map.get("targetDataSourceTypeId")));
}
selectTable.setSrcTableName((String) str.get("srcTableName")); //来源表
selectTable.setTargetTableName((String) str.get("targetTableName")); //目标表
//selectTable.setPkName(str.get("pkName").toString());
selectList.add(selectTable);
returnModel.setSelectTable(selectList);
if (map.containsKey("projectId")) {
returnModel.setProjectId(String.valueOf(map.get("projectId")));
}
returnModel.setTables((List<Map>) map.get("tables"));
}
return new JsonResult(returnModel);
}
......@@ -846,6 +876,9 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
if (realtimeTask == null) {
return JsonResult.error(ResultCode.OPERATION_DATA_NO_EXIST);
}
if (StringUtils.isEmpty(realtimeTask.getConnectorJobId())) {
return JsonResult.error(ResultCode.OPERATION_DATA_NO_EXIST);
}
Map map = new HashMap();
String[] ids = realTaskId.split(",");
......@@ -856,6 +889,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
map.put("ids", realtimeTask.getTreeId());
dmpNavigationTreeDao.deleteByTreeId(map);
}
DmpProjectSystemInfo dmpProjectSystemInfo = dmpProjectDao.queryProjectSystemInfo(Long.valueOf(realtimeTask.getProjectId()));
String kafkaConnectUrl = dmpProjectSystemInfo.getKafkaConnectorUrl(); //kafka 连接信息
String[] arr = kafkaConnectUrl.split(",");
String connectorUrl = arr[0];
if (connectorUrl.contains("@")) {
connectorUrl = connectorUrl.split("@")[1];
}
HttpClientUtils.httpDelete(connectorUrl + "/" + realtimeTask.getConnectorJobId() + "/");
return JsonResult.ok();
}
......@@ -904,6 +945,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
List<String> tableList = (List<String>) JSONObject.parse(rst.getMessage());
String[] tablesName = params.getTablesName().split(",");
if (tablesName.length > 0) {
for (String table : tablesName) {
Map map = new HashMap();
map.put("tableName", table);
......@@ -915,6 +957,80 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
returnList.add(map);
}
}
}
return JsonResult.ok(returnList);
}
/**
* 启动/停止运行实时同步任务
*
* @param realTaskId
* @param type
* @return
* @author Bellamy
* @since 2021-01-05
*/
@Override
public JsonResult executeRealtimeTask(String realTaskId, String type, String projectId) throws Exception {
DmpProjectSystemInfo dmpProjectSystemInfo = dmpProjectDao.queryProjectSystemInfo(Long.valueOf(projectId));
String kafkaConnectUrl = dmpProjectSystemInfo.getKafkaConnectorUrl(); //kafka 连接信息
String[] arr = kafkaConnectUrl.split(",");
String connectorUrl = arr[0];
if (connectorUrl.contains("@")) {
connectorUrl = connectorUrl.split("@")[1];
}
String[] ids = realTaskId.split(",");
List<DmpRealtimeSyncInfo> list = queryListById(ids);
if (list.size() > 0 && list != null) {
for (int i = 0; i < list.size(); i++) {
DmpRealtimeSyncInfo saveBaby = new DmpRealtimeSyncInfo();
DmpRealtimeSyncInfo dmpRealtimeSyncInfo = list.get(i);
String connectorJobId = dmpRealtimeSyncInfo.getConnectorJobId();
System.out.println(connectorJobId);
String url = connectorUrl + "/" + connectorJobId;
String statusUrl = url;
//resume 恢复,删除 delete,pause 暂停
String param = "";
if ("01".equals(type)) {
url += "/resume";
} else if ("02".equals(type)) {
url += "/pause";
}
logger.info("######执行表数据id{}" + ids[i]);
//执行 shell
OKHttpUtil.httpPut(url, "");
//获取任务状态
String status = getExecuteShellStatus(statusUrl + "/status", new HashMap<>());
//执行后任务状态 :PAUSED 暂停 , RUNNING 运行中
saveBaby.setStatus(status);
saveBaby.setUptPerson(SessionUtils.getCurrentUserId());
saveBaby.setUpdateTime(new Date());
saveBaby.setId(Integer.valueOf(realTaskId));
dmpRealtimeSyncInfoDao.update(saveBaby);
}
}
return JsonResult.ok();
}
/*
* 获取任务状态
* */
public String getExecuteShellStatus(String statusUrl, Map params) throws Exception {
String respData = HttpClientUtils.getJsonForParam(statusUrl, params);
if (StringUtils.isEmpty(respData)) {
throw new RuntimeException("执行失败!");
}
logger.info("#################响应结果{}" + respData);
Map map = JSONObject.parseObject(respData);
if (!map.containsKey("connector")) {
throw new RuntimeException("执行失败!");
}
Map connector = (Map) map.get("connector");
String status = (String) connector.get("state");
if (StringUtils.isEmpty(status))
throw new RuntimeException("执行失败!");
return status;
}
}
\ No newline at end of file
......@@ -338,6 +338,9 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
@Override
public JsonResult selectDataSourceInfoById(Map map) throws Exception {
DataSourceListDto asd = dmpSyncingDatasourceDao.selectDataSourceInfoById(map);
if(StringUtils.isNotEmpty(asd.getPassword())){
asd.setPassword(new BaseService().decode(asd.getPassword(),publicKey));
}
return new JsonResult(asd);
}
......
......@@ -11,9 +11,9 @@
<result property="targetTableName" column="target_table_name" jdbcType="VARCHAR"/>
<result property="type" column="type" jdbcType="INTEGER"/>
<result property="connectorJobId" column="connector_job_id" jdbcType="VARCHAR"/>
<result property="connectorJsonData" column="connector_json_data" jdbcType="OTHER"/>
<result property="connectorJsonData" column="connector_json_data" jdbcType="BLOB" typeHandler="com.jz.common.persistence.CBTHandler"/>
<result property="srcTopicName" column="src_topic_name" jdbcType="VARCHAR"/>
<result property="projectId" column="project_id" jdbcType="OTHER"/>
<result property="projectId" column="project_id" jdbcType="BLOB" typeHandler="com.jz.common.persistence.CBTHandler"/>
<result property="parentId" column="parent_id" jdbcType="INTEGER"/>
<result property="desensitizationField" column="desensitization_field" jdbcType="VARCHAR"/>
<result property="arithmetic" column="arithmetic" jdbcType="VARCHAR"/>
......@@ -41,7 +41,7 @@
id, tree_id, src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data
, src_topic_name, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name
, src_database_type, src_database_name, connector_url, target_database_type, target_database_name, src_datasource_name
, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person
, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person,version
from dmp_realtime_sync_info
where data_status='1' and id = #{id}
</select>
......@@ -303,6 +303,12 @@
<if test="uptPerson != null and uptPerson != ''">
upt_person = #{uptPerson},
</if>
<if test="scriptJson != null and scriptJson != ''">
script_json = #{scriptJson},
</if>
<if test="treeId != null and treeId != ''">
tree_id = #{treeId},
</if>
</set>
where id = #{id}
</update>
......@@ -319,7 +325,7 @@
t1.id,
t1.tree_id as treeId,
t2.name as treeName,
t1.status,
(case when t1.status='PAUSED' then '空闲' when t1.status='RUNNING' then '运行中' end) status,
date_format(t1.update_time,'%Y-%m-%d %H:%i:%s') as updateTime,
t1.src_datasource_id as srcDatasourceId,
t1.src_datasource_name as srcDatasourceName,
......@@ -327,7 +333,8 @@
t1.target_datasource_id as targetDatasourceId,
t1.target_datasource_name as targetDatasourceName,
t1.target_database_type as targetDatabaseType,
t1.online_status as onlineStatus
t1.online_status as onlineStatus,
t1.task_desc as taskDesc
FROM dmp_realtime_sync_info t1
inner join dmp_navigation_tree t2 on t1.tree_id=t2.id
left join dmp_syncing_datasource t3 ON t1.src_datasource_id = t3.ID
......@@ -512,24 +519,15 @@
<select id="selectRealtimeTaskById" resultType="java.util.Map">
SELECT
t1.id,
t1.tree_id as treeId,
t1.src_datasource_id AS srcDatasourceId,
t1.src_datasource_name AS srcDatasourceName,
t1.src_database_type AS srcDatabaseType,
t1.target_datasource_id AS targetDatasourceId,
t1.target_datasource_name AS targetDatasourceName,
t1.target_database_type AS targetDatabaseType,
t2.blacklist_table AS blacklistTable,
t3.desensitization_field AS desensitizationField,
t3.arithmetic,
t3.pk_name AS pkName,
t3.src_table_name AS srcTableName,
t3.target_table_name AS targetTableName
t1.script_json as scriptJson
FROM
dmp_realtime_sync_info t1
left join dmp_realtime_sync_blacklist_table_info t2 ON t1.id = t2.realtime_id
left join dmp_realtime_sync_select_table t3 on t1.id=t3.realtime_id
inner join dmp_navigation_tree t2 on t1.tree_id=t2.id and t2.data_status='1'
WHERE
1 = 1 and t1.data_status='1' and id = #{taskId}
1 = 1 and t1.data_status='1' and t1.id = #{taskId}
</select>
<!--批量删除 或 批量上下线-->
......
......@@ -267,6 +267,9 @@
<if test="impalaMasterFqdn != null">
impala_master_fqdn = #{impalaMasterFqdn},
</if>
<if test="testConnectStatus != null">
test_connect_status = #{testConnectStatus},
</if>
</set>
where ID = #{id}
</update>
......@@ -374,6 +377,7 @@
a.jdbc_url as jdbcUrl,
a.db_name as dbName,
a.user_name as userName,
a.password,
a.project_id as projectId,
a.protocol,
a.host,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment