Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
aa80f0d8
Commit
aa80f0d8
authored
Jan 07, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
610a5c1b
Changes
12
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
816 additions
and
478 deletions
+816
-478
GlobalExceptionHandler.java
.../java/com/jz/common/exception/GlobalExceptionHandler.java
+10
-10
DBUtil.java
src/main/java/com/jz/common/utils/realTime/DBUtil.java
+365
-354
RealTimeSyncController.java
...es/controller/DataIntegration/RealTimeSyncController.java
+25
-5
DmpRealtimeSyncInfoDao.java
...n/java/com/jz/dmp/modules/dao/DmpRealtimeSyncInfoDao.java
+2
-1
RealTimeSyncColumnModel.java
...ava/com/jz/dmp/modules/model/RealTimeSyncColumnModel.java
+67
-0
RealTimeSyncDataSourceModel.java
...com/jz/dmp/modules/model/RealTimeSyncDataSourceModel.java
+179
-0
RealTimeSyncModel.java
...main/java/com/jz/dmp/modules/model/RealTimeSyncModel.java
+16
-11
RealTimeSyncTableModel.java
...java/com/jz/dmp/modules/model/RealTimeSyncTableModel.java
+40
-28
DmpRealtimeSyncInfoService.java
...om/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
+12
-2
DmpRealtimeSyncInfoServiceImpl.java
.../modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
+42
-11
DmpProjectMapper.xml
src/main/resources/mapper/dmp/DmpProjectMapper.xml
+48
-48
DmpRealtimeSyncInfoMapper.xml
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
+10
-8
No files found.
src/main/java/com/jz/common/exception/GlobalExceptionHandler.java
View file @
aa80f0d8
...
...
@@ -28,9 +28,9 @@ public class GlobalExceptionHandler {
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
GlobalExceptionHandler
.
class
);
/**
*
403 - Bad Request
*
缺少请求参数
*/
@ResponseStatus
(
HttpStatus
.
BAD_REQUEST
)
@ResponseStatus
(
HttpStatus
.
OK
)
@ExceptionHandler
(
MissingServletRequestParameterException
.
class
)
public
JsonResult
handleMissingServletRequestParameterException
(
MissingServletRequestParameterException
e
)
{
logger
.
error
(
"缺少请求参数"
,
e
);
...
...
@@ -48,9 +48,9 @@ public class GlobalExceptionHandler {
}
/**
*
403 - Bad Request
*
参数验证失败
*/
@ResponseStatus
(
HttpStatus
.
BAD_REQUEST
)
@ResponseStatus
(
HttpStatus
.
OK
)
@ExceptionHandler
(
MethodArgumentNotValidException
.
class
)
public
JsonResult
handleMethodArgumentNotValidException
(
MethodArgumentNotValidException
e
)
{
logger
.
error
(
"参数验证失败"
,
e
);
...
...
@@ -78,11 +78,11 @@ public class GlobalExceptionHandler {
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
message
);
}
/**
* 403 - Bad Request
*/
@ResponseStatus
(
HttpStatus
.
BAD_REQUEST
)
@ResponseStatus
(
HttpStatus
.
OK
)
@ExceptionHandler
(
ValidationException
.
class
)
public
JsonResult
handleValidationException
(
ValidationException
e
)
{
logger
.
error
(
"参数验证失败"
,
e
);
...
...
@@ -95,7 +95,7 @@ public class GlobalExceptionHandler {
@ResponseStatus
(
HttpStatus
.
METHOD_NOT_ALLOWED
)
@ExceptionHandler
(
HttpRequestMethodNotSupportedException
.
class
)
public
JsonResult
handleHttpRequestMethodNotSupportedException
(
HttpRequestMethodNotSupportedException
e
)
{
logger
.
error
(
"不支持当前请求方
�?
"
,
e
);
logger
.
error
(
"不支持当前请求方
式
"
,
e
);
return
new
JsonResult
(
ResultCode
.
NOT_SUPPORTED
,
"request_method_not_supported:"
);
}
...
...
@@ -116,7 +116,7 @@ public class GlobalExceptionHandler {
@ExceptionHandler
(
ServiceException
.
class
)
public
JsonResult
handleServiceException
(
ServiceException
e
)
{
logger
.
error
(
"业务逻辑异常"
,
e
);
return
new
JsonResult
(
ResultCode
.
INTERNAL_SERVER_ERROR
,
"业务逻辑异常
�?
"
);
return
new
JsonResult
(
ResultCode
.
INTERNAL_SERVER_ERROR
,
"业务逻辑异常
!
"
);
}
/**
...
...
@@ -126,7 +126,7 @@ public class GlobalExceptionHandler {
@ExceptionHandler
(
Exception
.
class
)
public
JsonResult
handleException
(
Exception
e
)
{
logger
.
error
(
"通用逻辑异常"
,
e
);
return
new
JsonResult
(
ResultCode
.
INTERNAL_SERVER_ERROR
,
"
参数错误
"
);
return
new
JsonResult
(
ResultCode
.
INTERNAL_SERVER_ERROR
,
"
通用逻辑异常
"
);
}
/**
...
...
@@ -135,7 +135,7 @@ public class GlobalExceptionHandler {
@ResponseStatus
(
HttpStatus
.
INTERNAL_SERVER_ERROR
)
@ExceptionHandler
(
DataIntegrityViolationException
.
class
)
public
JsonResult
handleException
(
DataIntegrityViolationException
e
)
{
logger
.
error
(
"操作数据库出现异
�?
:"
,
e
);
logger
.
error
(
"操作数据库出现异
常
:"
,
e
);
return
new
JsonResult
(
ResultCode
.
NOT_SUPPORTED
,
"操作数据库出现异常:字段重复、有外键关联�?"
);
}
}
src/main/java/com/jz/common/utils/realTime/DBUtil.java
View file @
aa80f0d8
//package com.jz.common.utils.realTime;
//
//import com.jz.dmp.web.ui.modules.dmp.model.task.realtime.RealTimeSyncColumnModel;
//import com.jz.dmp.web.ui.modules.dmp.model.task.realtime.RealTimeSyncDataSourceModel;
//import com.jz.dmp.web.ui.modules.dmp.model.task.realtime.RealTimeSyncTableModel;
//import org.springframework.util.CollectionUtils;
//import org.springframework.util.StringUtils;
//
//import java.sql.*;
//import java.util.ArrayList;
//import java.util.HashMap;
//import java.util.List;
//import java.util.Map;
//
//public class DBUtil {
//
//
//
// /**
// * 创建连接
// * @param driverName 驱动名称
// * @param jdbcUrl 连接字符串
// * @param username 用户名
// * @param password 密码
// * @return 获取的连接
// * @throws ClassNotFoundException
// * @throws SQLException
// */
// public static Connection openConn(RealTimeSyncDataSourceModel realTimeSyncDataSourceModel) throws Exception {
// String driverName = realTimeSyncDataSourceModel.getDriverName();
// String jdbcUrl = realTimeSyncDataSourceModel.getJdbcUrl();
// String username = realTimeSyncDataSourceModel.getUserName();
// String password = realTimeSyncDataSourceModel.getPassword();
// if (!StringUtils.isEmpty(driverName)){
// Class.forName(driverName);
// }
// return DriverManager.getConnection(jdbcUrl, username, password);
// }
//
// // 关闭数据库连接
// public static void closeConn(Connection conn) {
// try {
// if (conn != null) {
// conn.close();
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// }
//
//
// /**
// * 获取数据库里的元数据信息
// * @param connection 创建的数据库连接
// * @return
// * @throws Exception
// */
// public static DatabaseMetaData getMetaData(Connection connection) throws Exception {
// if (connection != null) {
// return connection.getMetaData();
// }
// return null;
// }
//
//
// /**
// * 获取数据源里的所有表详细信息
// * @param realTimeSyncDataSourceModel 根据数据源Id获取的数据源信息
// * @param toQueryTableName 要查询的表明 如果为空,默认查询所有表
// * @param isContainColumnInfo 是否包含列信息
// * @param tableToTableSyncTasks 当前数据源已经同步过的表到表的任务
// * @param blacklistTables 黑名单表
// * @return
// */
// public static List<RealTimeSyncTableModel> getDataSourceTables(RealTimeSyncDataSourceModel realTimeSyncDataSourceModel, String toQueryTableName, boolean isContainColumnInfo, Map<String, Map<String, String>> tableToTableSyncTasksMap, String blacklistTables, String selectTablesName, Map<String, String> setttingBlacklistTableMap ){
// Connection connection = null;
// try {
// connection = openConn(realTimeSyncDataSourceModel);
// DatabaseMetaData metaData = connection.getMetaData();
// return getDataSourceTables(metaData, toQueryTableName, isContainColumnInfo,tableToTableSyncTasksMap,blacklistTables,selectTablesName,setttingBlacklistTableMap);
// } catch (Exception e) {
// e.printStackTrace();
// }finally {
// closeConn(connection);
// }
//
// return null;
// }
//
// /**
// * 获取数据源里的表信息
// * @param metaData 数据库源数据
// * @param toQueryTableName 要查询的表明 如果为空,默认查询所有表
// * @param isContainColumnInfo 是否包含列信息
// * @param tableToTableSyncTasksMap 当前表已经存在的任务
// * @param blacklistTables 前台设置后 传递到后台的黑名单表 ,如果在黑名单内 不返回当前表的信息
// * @param selectTablesName 选择的要同步的表
// * @return
// * @throws Exception
// */
// public static List<RealTimeSyncTableModel> getDataSourceTables(DatabaseMetaData metaData, String toQueryTableName, boolean isContainColumnInfo, Map<String, Map<String, String>> tableToTableSyncTasksMap, String blacklistTables, String selectTablesName, Map<String, String> setttingBlacklistTableMap ) throws Exception{
//
// List<RealTimeSyncTableModel> dataSourceTables = null;
// if (metaData != null) {
// String tableNamePattern = "%";
// if (!StringUtils.isEmpty(toQueryTableName)) {
// tableNamePattern = toQueryTableName+"%";
// }
// ResultSet tables = metaData.getTables(null, "%", tableNamePattern, new String[] {"TABLE"});
// if (tables.wasNull()) {
// return dataSourceTables;
// }
//
// //选择查询的表
// Map<String, String> selectTableMap = null;
//
// if (!StringUtils.isEmpty(selectTablesName)) {
// selectTableMap = new HashMap<>();
// String [] selectTableArr = selectTablesName.split(",");
// for (String selectTableName : selectTableArr) {
// selectTableMap.put(selectTableName, selectTableName);
// }
// }
//
// //返回信息中不包含的表
// Map<String, String> notIncludeTableMap = null;
// //判断表是否在黑名单表
// if (!StringUtils.isEmpty(blacklistTables)) {
// notIncludeTableMap = new HashMap<>();
// String [] blacklistTablesArr = blacklistTables.split(",");
// for (String blacklistTable : blacklistTablesArr) {
// notIncludeTableMap.put(blacklistTable, blacklistTable);
// }
// }
//
// dataSourceTables = new ArrayList<>();
// RealTimeSyncTableModel realTimeSyncTableModel = null;
// while (tables.next()) {
//
// //表名
// String tableName = tables.getString("TABLE_NAME");
//
// String schema = tables.getString("TABLE_SCHEM");
// //sqlServer 过滤sys表 只要dbo的表
// if ("sys".equals(schema)) {
// continue;
// }
//
// //已经选择的表
// if (!CollectionUtils.isEmpty(selectTableMap) && !selectTableMap.containsKey(tableName)) {
// //当前表不在选择的表内 不继续构造信息
// continue;
// }
//
// //选择的黑名单
// if (!CollectionUtils.isEmpty(notIncludeTableMap) && notIncludeTableMap.containsKey(tableName)) {
// notIncludeTableMap.remove(tableName);
// continue;
// }
//
//
// realTimeSyncTableModel = new RealTimeSyncTableModel();
// realTimeSyncTableModel.setTableName(tableName);
// //判断表的记录在同步任务中是否存在记录,如果已经存在,说明同步过
// String desensitizationField = null ;
// if (!CollectionUtils.isEmpty(tableToTableSyncTasksMap) && tableToTableSyncTasksMap.containsKey(tableName)) {
// realTimeSyncTableModel.setIsSubmited(true);
// //desensitization_field,脱敏字段
// //算法 arithmetic
// Map<String, String> tempMap = tableToTableSyncTasksMap.get(tableName);
// desensitizationField = tempMap.get("desensitization_field");
// String arithmetic = tempMap.get("arithmetic");
// realTimeSyncTableModel.setDesensitizationField(desensitizationField);
// realTimeSyncTableModel.setArithmetic(arithmetic);
// }
//
// //判断表是否在设置的黑名单中 如果有,说明已经在黑名单中 设置成true,返回给前端
// if (!CollectionUtils.isEmpty(setttingBlacklistTableMap) && setttingBlacklistTableMap.containsKey(tableName)) {
// realTimeSyncTableModel.setIsBlacklist(true);
// }
//
// //主键
// String pkName = getPrimaryKeyColumnByTableName(metaData, tableName);
// realTimeSyncTableModel.setPkName(pkName);
//
// //是否包含列信息
// if (isContainColumnInfo) {
// Map<String, String > desensitizationFieldMap = null;
// if (!StringUtils.isEmpty(desensitizationField)) {
// desensitizationFieldMap = new HashMap<>();
// String [] desensitizationFieldArr = desensitizationField.split(",");
// for (String desensitizationFieldName : desensitizationFieldArr) {
// selectTableMap.put(desensitizationFieldName, desensitizationFieldName);
// }
// }
// realTimeSyncTableModel.setColumnInfo(getColumnInfoByTableName(metaData, tableName,desensitizationFieldMap));
// }
// dataSourceTables.add(realTimeSyncTableModel);
// }
//
// }
// return dataSourceTables;
// }
//
// /**
// * 获取当前表里 设置主键的列名
// * @param metaData
// * @param tableName
// * @return
// * @throws Exception
// */
// public static String getPrimaryKeyColumnByTableName(DatabaseMetaData metaData,String tableName) throws Exception {
// if (metaData == null) {
// return null;
// }
//
// ResultSet pkResultSet = metaData.getPrimaryKeys(null, null, tableName);
//
// String primaryKey = "";
// while (pkResultSet.next()) {
// primaryKey += pkResultSet.getString("COLUMN_NAME")+",";
// }
//
// if (StringUtils.isEmpty(primaryKey)){
// //没有设置主键 但是设置了唯一索引
// ResultSet indexResultSet = metaData.getIndexInfo(null, null, "yyy3", true, false);
// while (indexResultSet.next()){
// primaryKey += indexResultSet.getString("COLUMN_NAME")+",";
// }
// }
//
// if (!StringUtils.isEmpty(primaryKey)) {
// primaryKey = primaryKey.substring(0, primaryKey.length() - 1);
// }
//
// return primaryKey;
// }
//
// /**
// * 获取当前表的所有列 以及是字符串的列
// * @param metaData
// * @param tableName
// * @throws Exception
// */
// public static RealTimeSyncColumnModel getColumnInfoByTableName(DatabaseMetaData metaData, String tableName, Map<String, String> desensitizationFieldMap) throws Exception {
// RealTimeSyncColumnModel realTimeSyncColumnModel = null ;
// ResultSet columnResult = metaData.getColumns(null, "%", tableName,"%");
// if (!columnResult.wasNull()) {
// realTimeSyncColumnModel = new RealTimeSyncColumnModel();
// List<String> allColumns = new ArrayList<>();
//
//
// List<Map<String, Object>> strColumns = new ArrayList<>();
// Map<String, Object> tempMap = null ;
//
// while (columnResult.next()) {
// //列名
// String columnName = columnResult.getString("COLUMN_NAME");
// allColumns.add(columnName);
// //列类型
// String columnType = columnResult.getString("TYPE_NAME");
// if (isStringColumn(columnType)) {
// tempMap = new HashMap<>();
// tempMap.put("columnName", columnName);
// if (!CollectionUtils.isEmpty(desensitizationFieldMap) && desensitizationFieldMap.containsKey(columnName)) {
// tempMap.put("isDesensitization", true);
// }
// strColumns.add(tempMap);
// }
// }
// realTimeSyncColumnModel.setAllColumns(null);
// realTimeSyncColumnModel.setStrColumns(strColumns);
// realTimeSyncColumnModel.setTableName(tableName);
// }
// return realTimeSyncColumnModel;
// }
//
// /**
// * 判断列是否是字符串类型的列
// * @param columnType
// * @return
// */
// private static boolean isStringColumn(String columnType) {
// columnType = columnType.toLowerCase();
// if (columnType.equals("varchar")
// || columnType.equals("nvarchar")
// || columnType.equals("char")
// || columnType.equals("tinytext")
// || columnType.equals("text")
// || columnType.equals("mediumtext")
// || columnType.equals("longtext")
// ) {
// return true ;
// }
// return false;
// }
//
//
// public static void main(String[] args) {
//
// String aa = "aa,bb,";
//
// System.err.println(aa.substring(0,aa.length()-1));
//
// String driverName = "com.mysql.jdbc.Driver";
// String jdbcUrl = "jdbc:mysql://10.0.53.179:3306/testunique";
// String username = "root";
// String password = "jz@2018";
//
// RealTimeSyncDataSourceModel realTimeSyncDataSourceModel = new RealTimeSyncDataSourceModel();
// realTimeSyncDataSourceModel.setDriverName(driverName);
// realTimeSyncDataSourceModel.setJdbcUrl(jdbcUrl);
// realTimeSyncDataSourceModel.setUserName(username);
// realTimeSyncDataSourceModel.setPassword(password);
//
// String blacklistTables = "";
// Map<String, Map<String, String>> map = null;
// Connection connection = null ;
// try {
// connection = openConn(realTimeSyncDataSourceModel);
// DatabaseMetaData metaData = getMetaData(connection);
//
//
// ResultSet indexResultSet = metaData.getIndexInfo(null, null, "yyy3", true, false);
//
// String primaryKey = "";
// while (indexResultSet.next()){
// primaryKey += indexResultSet.getString("COLUMN_NAME")+",";
// }
//
// if (!StringUtils.isEmpty(primaryKey)) {
// primaryKey = primaryKey.substring(0, primaryKey.length() - 1);
// }
// System.err.println(primaryKey);
//
// /*ResultSet resultSet = metaData.getColumns(null,"%","yyy3","%");
// while (resultSet.next()){
// System.err.println(resultSet.getString("COLUMN_NAME") +"--"+resultSet.getString("TYPE_NAME"));
// }*/
//
// String key = getPrimaryKeyColumnByTableName(metaData,"yyy3");
//
// System.err.println("key--"+key);
//
// } catch (Exception e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }finally {
// closeConn(connection);
// }
// }
//
//
//}
package
com
.
jz
.
common
.
utils
.
realTime
;
import
com.jz.dmp.modules.model.RealTimeSyncColumnModel
;
import
com.jz.dmp.modules.model.RealTimeSyncDataSourceModel
;
import
com.jz.dmp.modules.model.RealTimeSyncTableModel
;
import
org.springframework.util.CollectionUtils
;
import
org.springframework.util.StringUtils
;
import
java.sql.*
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
public
class
DBUtil
{
/**
* 创建连接
*
* @param driverName 驱动名称
* @param jdbcUrl 连接字符串
* @param username 用户名
* @param password 密码
* @return 获取的连接
* @throws ClassNotFoundException
* @throws SQLException
*/
public
static
Connection
openConn
(
RealTimeSyncDataSourceModel
realTimeSyncDataSourceModel
)
throws
Exception
{
String
driverName
=
realTimeSyncDataSourceModel
.
getDriverName
();
//jdbc驱动名称
String
jdbcUrl
=
realTimeSyncDataSourceModel
.
getJdbcUrl
();
// jdbc 连接
String
username
=
realTimeSyncDataSourceModel
.
getUserName
();
//用户
String
password
=
realTimeSyncDataSourceModel
.
getPassword
();
//密码
if
(!
StringUtils
.
isEmpty
(
driverName
))
{
Class
.
forName
(
driverName
);
}
return
DriverManager
.
getConnection
(
jdbcUrl
,
username
,
password
);
}
// 关闭数据库连接
public
static
void
closeConn
(
Connection
conn
)
{
try
{
if
(
conn
!=
null
)
{
conn
.
close
();
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
/**
* 获取数据库里的元数据信息
*
* @param connection 创建的数据库连接
* @return
* @throws Exception
*/
public
static
DatabaseMetaData
getMetaData
(
Connection
connection
)
throws
Exception
{
if
(
connection
!=
null
)
{
return
connection
.
getMetaData
();
}
return
null
;
}
/**
* 获取数据源里的所有表详细信息
*
* @param realTimeSyncDataSourceModel 根据数据源Id获取的数据源信息
* @param toQueryTableName 要查询的表明 如果为空,默认查询所有表
* @param isContainColumnInfo 是否包含列信息
* @param tableToTableSyncTasksMap 数据源对应的表详细信息
* @param selectTablesName 选中的表名称
* @param blacklistTables 选择的黑名单表
* @param setttingBlacklistTableMap 来源数据源的黑名单表
* @return
*/
public
static
List
<
RealTimeSyncTableModel
>
getDataSourceTables
(
RealTimeSyncDataSourceModel
realTimeSyncDataSourceModel
,
String
toQueryTableName
,
boolean
isContainColumnInfo
,
Map
<
String
,
Map
<
String
,
String
>>
tableToTableSyncTasksMap
,
String
blacklistTables
,
String
selectTablesName
,
Map
<
String
,
String
>
setttingBlacklistTableMap
)
{
Connection
connection
=
null
;
try
{
//创建JDBC连接
connection
=
openConn
(
realTimeSyncDataSourceModel
);
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
//获取数据源里的表信息
return
getDataSourceTables
(
metaData
,
toQueryTableName
,
isContainColumnInfo
,
tableToTableSyncTasksMap
,
blacklistTables
,
selectTablesName
,
setttingBlacklistTableMap
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
finally
{
closeConn
(
connection
);
}
return
null
;
}
/**
* 获取数据源里的表信息
*
* @param metaData 数据库源数据
* @param toQueryTableName 要查询的表明 如果为空,默认查询所有表
* @param isContainColumnInfo 是否包含列信息
* @param tableToTableSyncTasksMap 当前表已经存在的任务
* @param blacklistTables 前台设置后 传递到后台的黑名单表 ,如果在黑名单内 不返回当前表的信息
* @param selectTablesName 选择的要同步的表
* @return
* @throws Exception
*/
public
static
List
<
RealTimeSyncTableModel
>
getDataSourceTables
(
DatabaseMetaData
metaData
,
String
toQueryTableName
,
boolean
isContainColumnInfo
,
Map
<
String
,
Map
<
String
,
String
>>
tableToTableSyncTasksMap
,
String
blacklistTables
,
String
selectTablesName
,
Map
<
String
,
String
>
setttingBlacklistTableMap
)
throws
Exception
{
List
<
RealTimeSyncTableModel
>
dataSourceTables
=
null
;
if
(
metaData
!=
null
)
{
String
tableNamePattern
=
"%"
;
if
(!
StringUtils
.
isEmpty
(
toQueryTableName
))
{
tableNamePattern
=
toQueryTableName
+
"%"
;
}
ResultSet
tables
=
metaData
.
getTables
(
null
,
"%"
,
tableNamePattern
,
new
String
[]{
"TABLE"
});
if
(
tables
.
wasNull
())
{
return
dataSourceTables
;
}
//选择查询的表
Map
<
String
,
String
>
selectTableMap
=
null
;
if
(!
StringUtils
.
isEmpty
(
selectTablesName
))
{
selectTableMap
=
new
HashMap
<>();
String
[]
selectTableArr
=
selectTablesName
.
split
(
","
);
for
(
String
selectTableName
:
selectTableArr
)
{
selectTableMap
.
put
(
selectTableName
,
selectTableName
);
}
}
//返回信息中不包含的表
Map
<
String
,
String
>
notIncludeTableMap
=
null
;
//判断表是否在黑名单表
if
(!
StringUtils
.
isEmpty
(
blacklistTables
))
{
notIncludeTableMap
=
new
HashMap
<>();
String
[]
blacklistTablesArr
=
blacklistTables
.
split
(
","
);
for
(
String
blacklistTable
:
blacklistTablesArr
)
{
notIncludeTableMap
.
put
(
blacklistTable
,
blacklistTable
);
}
}
dataSourceTables
=
new
ArrayList
<>();
RealTimeSyncTableModel
realTimeSyncTableModel
=
null
;
while
(
tables
.
next
())
{
//表名
String
tableName
=
tables
.
getString
(
"TABLE_NAME"
);
String
schema
=
tables
.
getString
(
"TABLE_SCHEM"
);
//sqlServer 过滤sys表 只要dbo的表
if
(
"sys"
.
equals
(
schema
))
{
continue
;
}
//已经选择的表
if
(!
CollectionUtils
.
isEmpty
(
selectTableMap
)
&&
!
selectTableMap
.
containsKey
(
tableName
))
{
//当前表不在选择的表内 不继续构造信息
continue
;
}
//选择的黑名单
if
(!
CollectionUtils
.
isEmpty
(
notIncludeTableMap
)
&&
notIncludeTableMap
.
containsKey
(
tableName
))
{
notIncludeTableMap
.
remove
(
tableName
);
continue
;
}
realTimeSyncTableModel
=
new
RealTimeSyncTableModel
();
realTimeSyncTableModel
.
setTableName
(
tableName
);
//判断表的记录在同步任务中是否存在记录,如果已经存在,说明同步过
String
desensitizationField
=
null
;
if
(!
CollectionUtils
.
isEmpty
(
tableToTableSyncTasksMap
)
&&
tableToTableSyncTasksMap
.
containsKey
(
tableName
))
{
realTimeSyncTableModel
.
setIfSubmited
(
true
);
//desensitization_field,脱敏字段
//算法 arithmetic
Map
<
String
,
String
>
tempMap
=
tableToTableSyncTasksMap
.
get
(
tableName
);
desensitizationField
=
tempMap
.
get
(
"desensitization_field"
);
String
arithmetic
=
tempMap
.
get
(
"arithmetic"
);
realTimeSyncTableModel
.
setDesensitizationField
(
desensitizationField
);
realTimeSyncTableModel
.
setArithmetic
(
arithmetic
);
}
//判断表是否在设置的黑名单中 如果有,说明已经在黑名单中 设置成true,返回给前端
if
(!
CollectionUtils
.
isEmpty
(
setttingBlacklistTableMap
)
&&
setttingBlacklistTableMap
.
containsKey
(
tableName
))
{
realTimeSyncTableModel
.
setIfBlacklist
(
true
);
}
//主键
String
pkName
=
getPrimaryKeyColumnByTableName
(
metaData
,
tableName
);
realTimeSyncTableModel
.
setPkName
(
pkName
);
//是否包含列信息
if
(
isContainColumnInfo
)
{
Map
<
String
,
String
>
desensitizationFieldMap
=
null
;
if
(!
StringUtils
.
isEmpty
(
desensitizationField
))
{
desensitizationFieldMap
=
new
HashMap
<>();
String
[]
desensitizationFieldArr
=
desensitizationField
.
split
(
","
);
for
(
String
desensitizationFieldName
:
desensitizationFieldArr
)
{
selectTableMap
.
put
(
desensitizationFieldName
,
desensitizationFieldName
);
}
}
realTimeSyncTableModel
.
setColumnInfo
(
getColumnInfoByTableName
(
metaData
,
tableName
,
desensitizationFieldMap
));
}
dataSourceTables
.
add
(
realTimeSyncTableModel
);
}
}
return
dataSourceTables
;
}
/**
* 获取当前表里 设置主键的列名
*
* @param metaData
* @param tableName
* @return
* @throws Exception
*/
public
static
String
getPrimaryKeyColumnByTableName
(
DatabaseMetaData
metaData
,
String
tableName
)
throws
Exception
{
if
(
metaData
==
null
)
{
return
null
;
}
ResultSet
pkResultSet
=
metaData
.
getPrimaryKeys
(
null
,
null
,
tableName
);
String
primaryKey
=
""
;
while
(
pkResultSet
.
next
())
{
primaryKey
+=
pkResultSet
.
getString
(
"COLUMN_NAME"
)
+
","
;
}
if
(
StringUtils
.
isEmpty
(
primaryKey
))
{
//没有设置主键 但是设置了唯一索引
ResultSet
indexResultSet
=
metaData
.
getIndexInfo
(
null
,
null
,
"yyy3"
,
true
,
false
);
while
(
indexResultSet
.
next
())
{
primaryKey
+=
indexResultSet
.
getString
(
"COLUMN_NAME"
)
+
","
;
}
}
if
(!
StringUtils
.
isEmpty
(
primaryKey
))
{
primaryKey
=
primaryKey
.
substring
(
0
,
primaryKey
.
length
()
-
1
);
}
return
primaryKey
;
}
/**
* 获取当前表的所有列 以及是字符串的列
*
* @param metaData
* @param tableName
* @throws Exception
*/
public
static
RealTimeSyncColumnModel
getColumnInfoByTableName
(
DatabaseMetaData
metaData
,
String
tableName
,
Map
<
String
,
String
>
desensitizationFieldMap
)
throws
Exception
{
RealTimeSyncColumnModel
realTimeSyncColumnModel
=
null
;
ResultSet
columnResult
=
metaData
.
getColumns
(
null
,
"%"
,
tableName
,
"%"
);
if
(!
columnResult
.
wasNull
())
{
realTimeSyncColumnModel
=
new
RealTimeSyncColumnModel
();
List
<
String
>
allColumns
=
new
ArrayList
<>();
List
<
Map
<
String
,
Object
>>
strColumns
=
new
ArrayList
<>();
Map
<
String
,
Object
>
tempMap
=
null
;
while
(
columnResult
.
next
())
{
//列名
String
columnName
=
columnResult
.
getString
(
"COLUMN_NAME"
);
allColumns
.
add
(
columnName
);
//列类型
String
columnType
=
columnResult
.
getString
(
"TYPE_NAME"
);
if
(
isStringColumn
(
columnType
))
{
tempMap
=
new
HashMap
<>();
tempMap
.
put
(
"columnName"
,
columnName
);
if
(!
CollectionUtils
.
isEmpty
(
desensitizationFieldMap
)
&&
desensitizationFieldMap
.
containsKey
(
columnName
))
{
tempMap
.
put
(
"isDesensitization"
,
true
);
}
strColumns
.
add
(
tempMap
);
}
}
realTimeSyncColumnModel
.
setAllColumns
(
null
);
realTimeSyncColumnModel
.
setStrColumns
(
strColumns
);
realTimeSyncColumnModel
.
setTableName
(
tableName
);
}
return
realTimeSyncColumnModel
;
}
/**
* 判断列是否是字符串类型的列
*
* @param columnType
* @return
*/
private
static
boolean
isStringColumn
(
String
columnType
)
{
columnType
=
columnType
.
toLowerCase
();
if
(
columnType
.
equals
(
"varchar"
)
||
columnType
.
equals
(
"nvarchar"
)
||
columnType
.
equals
(
"char"
)
||
columnType
.
equals
(
"tinytext"
)
||
columnType
.
equals
(
"text"
)
||
columnType
.
equals
(
"mediumtext"
)
||
columnType
.
equals
(
"longtext"
)
)
{
return
true
;
}
return
false
;
}
public
static
void
main
(
String
[]
args
)
{
String
aa
=
"aa,bb,"
;
System
.
err
.
println
(
aa
.
substring
(
0
,
aa
.
length
()
-
1
));
String
driverName
=
"com.mysql.jdbc.Driver"
;
String
jdbcUrl
=
"jdbc:mysql://10.0.53.179:3306/testunique"
;
String
username
=
"root"
;
String
password
=
"jz@2018"
;
RealTimeSyncDataSourceModel
realTimeSyncDataSourceModel
=
new
RealTimeSyncDataSourceModel
();
realTimeSyncDataSourceModel
.
setDriverName
(
driverName
);
realTimeSyncDataSourceModel
.
setJdbcUrl
(
jdbcUrl
);
realTimeSyncDataSourceModel
.
setUserName
(
username
);
realTimeSyncDataSourceModel
.
setPassword
(
password
);
String
blacklistTables
=
""
;
Map
<
String
,
Map
<
String
,
String
>>
map
=
null
;
Connection
connection
=
null
;
try
{
connection
=
openConn
(
realTimeSyncDataSourceModel
);
DatabaseMetaData
metaData
=
getMetaData
(
connection
);
ResultSet
indexResultSet
=
metaData
.
getIndexInfo
(
null
,
null
,
"yyy3"
,
true
,
false
);
String
primaryKey
=
""
;
while
(
indexResultSet
.
next
())
{
primaryKey
+=
indexResultSet
.
getString
(
"COLUMN_NAME"
)
+
","
;
}
if
(!
StringUtils
.
isEmpty
(
primaryKey
))
{
primaryKey
=
primaryKey
.
substring
(
0
,
primaryKey
.
length
()
-
1
);
}
System
.
err
.
println
(
primaryKey
);
/*ResultSet resultSet = metaData.getColumns(null,"%","yyy3","%");
while (resultSet.next()){
System.err.println(resultSet.getString("COLUMN_NAME") +"--"+resultSet.getString("TYPE_NAME"));
}*/
String
key
=
getPrimaryKeyColumnByTableName
(
metaData
,
"yyy3"
);
System
.
err
.
println
(
"key--"
+
key
);
}
catch
(
Exception
e
)
{
// TODO Auto-generated catch block
e
.
printStackTrace
();
}
finally
{
closeConn
(
connection
);
}
}
}
src/main/java/com/jz/dmp/modules/controller/DataIntegration/RealTimeSyncController.java
View file @
aa80f0d8
...
...
@@ -9,6 +9,7 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeTableInfoReq
;
import
com.jz.dmp.modules.model.DmpRealtimeSyncInfo
;
import
com.jz.dmp.modules.model.RealTimeSyncModel
;
import
com.jz.dmp.modules.service.DmpRealtimeSyncInfoService
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiImplicitParam
;
...
...
@@ -18,6 +19,7 @@ import org.apache.commons.lang3.StringUtils;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.validation.annotation.Validated
;
import
org.springframework.web.bind.annotation.*
;
import
javax.servlet.http.HttpServletRequest
;
...
...
@@ -50,7 +52,7 @@ public class RealTimeSyncController {
*/
@ApiOperation
(
value
=
"实时同步任务列表分页查询"
,
notes
=
"实时同步任务列表分页查询"
)
@PostMapping
(
value
=
"/realTimeSyncListPage"
)
public
PageInfoResponse
<
RealTimeSyncListDto
>
getDataSourceListPage
(
@RequestBody
RealTimeSyncListReq
req
,
HttpServletRequest
httpRequest
)
throws
Exception
{
public
PageInfoResponse
<
RealTimeSyncListDto
>
getDataSourceListPage
(
@RequestBody
@Validated
RealTimeSyncListReq
req
,
HttpServletRequest
httpRequest
)
throws
Exception
{
PageInfoResponse
<
RealTimeSyncListDto
>
pageInfo
=
new
PageInfoResponse
<
RealTimeSyncListDto
>();
if
(
StringUtils
.
isEmpty
(
req
.
getProjectId
()))
{
pageInfo
.
setMessage
(
"项目id不能为空!"
);
...
...
@@ -141,22 +143,40 @@ public class RealTimeSyncController {
}
/**
*
根据数据源id获取表详细
信息
*
新增--选择源表
信息
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
@ApiOperation
(
value
=
"新增--
获取表信息"
,
notes
=
"新增--获取
表信息"
)
@ApiOperation
(
value
=
"新增--
选择源表信息"
,
notes
=
"新增--选择源
表信息"
)
@PostMapping
(
value
=
"/getSourceDbTableList"
)
public
JsonResult
getTableInfo
(
@RequestBody
RealTimeTableInfoReq
req
,
HttpServletRequest
httpRequest
)
throws
Exception
{
public
JsonResult
<
RealTimeSyncModel
>
getTableInfo
(
@RequestBody
@Validated
RealTimeTableInfoReq
req
,
HttpServletRequest
httpRequest
)
throws
Exception
{
if
(
StringUtils
.
isEmpty
(
req
.
getProjectId
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"项目id不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
req
.
getSrcDatasourceId
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"来源数据源id不能为空!"
);
}
JsonResult
jsonResult
=
dmpRealtimeSyncInfoService
.
queryTableInfoByParams
(
req
);
JsonResult
<
RealTimeSyncModel
>
jsonResult
=
dmpRealtimeSyncInfoService
.
queryTableInfoByParams
(
req
);
return
jsonResult
;
}
/**
* 新增--选择目标信息
*
* @return
* @author Bellamy
* @since 2021-01-07
*/
@ApiOperation
(
value
=
"新增--选择目标信息"
,
notes
=
"新增--选择目标信息"
)
@GetMapping
(
value
=
"/targetDatasourceInfo"
)
@ApiImplicitParam
(
name
=
"projectId"
,
value
=
"项目id"
,
required
=
true
)
public
JsonResult
getTargetDatasourceInfo
(
@RequestParam
String
projectId
)
throws
Exception
{
if
(
StringUtils
.
isEmpty
(
projectId
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"项目id不能为空!"
);
}
JsonResult
jsonResult
=
dmpRealtimeSyncInfoService
.
selectTargetDatasourceInfo
(
projectId
);
return
jsonResult
;
}
...
...
src/main/java/com/jz/dmp/modules/dao/DmpRealtimeSyncInfoDao.java
View file @
aa80f0d8
...
...
@@ -5,6 +5,7 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq
;
import
com.jz.dmp.modules.model.DmpRealtimeSyncInfo
;
import
com.jz.dmp.modules.model.RealTimeSyncDataSourceModel
;
import
org.apache.ibatis.annotations.Param
;
import
java.util.List
;
...
...
@@ -109,7 +110,7 @@ public interface DmpRealtimeSyncInfoDao {
* @author Bellamy
* @since 2021-01-06
*/
Map
querygSourceDbInfoById
(
@Param
(
"srcDataSourceId"
)
String
srcDataSourceId
)
throws
Exception
;
RealTimeSyncDataSourceModel
querygSourceDbInfoById
(
@Param
(
"srcDataSourceId"
)
String
srcDataSourceId
)
throws
Exception
;
List
<
Map
>
queryRealTimeInfoByDataSourceId
(
@Param
(
"srcDatasourceId"
)
String
srcDatasourceId
,
@Param
(
"targetDatasourceId"
)
String
targetDatasourceId
);
...
...
src/main/java/com/jz/dmp/modules/model/RealTimeSyncColumnModel.java
0 → 100644
View file @
aa80f0d8
package
com
.
jz
.
dmp
.
modules
.
model
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
java.io.Serializable
;
import
java.util.List
;
import
java.util.Map
;
/**
* 实时同步表信息 列级别的信息
*/
@ApiModel
(
value
=
"实时同步表下列级别的信息"
,
description
=
"实时同步表下列级别的信息"
)
public
class
RealTimeSyncColumnModel
implements
Serializable
{
private
static
final
long
serialVersionUID
=
7171169593984549127L
;
/**
* 表名
*/
@ApiModelProperty
(
value
=
"表名"
)
private
String
tableName
;
/**
* 当前表的所有列
*/
@ApiModelProperty
(
value
=
"当前表的所有列"
)
private
List
<
String
>
allColumns
;
/**
* 当前表是字符串类型的列
*/
@ApiModelProperty
(
value
=
"当前表是字符串类型的列"
)
private
List
<
Map
<
String
,
Object
>>
strColumns
;
public
String
getTableName
()
{
return
tableName
;
}
public
void
setTableName
(
String
tableName
)
{
this
.
tableName
=
tableName
;
}
public
List
<
String
>
getAllColumns
()
{
return
allColumns
;
}
public
void
setAllColumns
(
List
<
String
>
allColumns
)
{
this
.
allColumns
=
allColumns
;
}
public
List
<
Map
<
String
,
Object
>>
getStrColumns
()
{
return
strColumns
;
}
public
void
setStrColumns
(
List
<
Map
<
String
,
Object
>>
strColumns
)
{
this
.
strColumns
=
strColumns
;
}
}
src/main/java/com/jz/dmp/modules/model/RealTimeSyncDataSourceModel.java
0 → 100644
View file @
aa80f0d8
package
com
.
jz
.
dmp
.
modules
.
model
;
import
java.util.Date
;
/**
* 实时同步表信息 列级别的信息
*/
public
class
RealTimeSyncDataSourceModel
{
/**
* 数据源id
*/
private
Integer
id
;
/**
* 数据源名称
*/
private
String
datasourceName
;
/**
* 数据源描述
*/
private
String
datasourceDesc
;
/**
* 数据源类型
*/
private
Integer
datasourceType
;
/**
* jdbc 连接字符串
*/
private
String
jdbcUrl
;
/**
* 数据库名称
*/
private
String
dbName
;
/**
* 数据库用户名
*/
private
String
userName
;
/**
* 数据库密码
*/
private
String
password
;
/**
* 数据库主机地址
*/
private
String
host
;
/**
* 数据库端口
*/
private
String
port
;
/**
* 项目id
*/
private
Integer
projectId
;
/**
* jdbc驱动名称
*/
private
String
driverName
;
/**
* 数据源类型名称
*/
private
String
dataSourceTypeName
;
public
Integer
getDatasourceType
()
{
return
datasourceType
;
}
public
void
setDatasourceType
(
Integer
datasourceType
)
{
this
.
datasourceType
=
datasourceType
;
}
public
String
getJdbcUrl
()
{
return
jdbcUrl
;
}
public
void
setJdbcUrl
(
String
jdbcUrl
)
{
this
.
jdbcUrl
=
jdbcUrl
;
}
public
String
getDbName
()
{
return
dbName
;
}
public
void
setDbName
(
String
dbName
)
{
this
.
dbName
=
dbName
;
}
public
String
getUserName
()
{
return
userName
;
}
public
void
setUserName
(
String
userName
)
{
this
.
userName
=
userName
;
}
public
String
getPassword
()
{
return
password
;
}
public
void
setPassword
(
String
password
)
{
this
.
password
=
password
;
}
public
String
getHost
()
{
return
host
;
}
public
void
setHost
(
String
host
)
{
this
.
host
=
host
;
}
public
String
getPort
()
{
return
port
;
}
public
void
setPort
(
String
port
)
{
this
.
port
=
port
;
}
public
Integer
getProjectId
()
{
return
projectId
;
}
public
void
setProjectId
(
Integer
projectId
)
{
this
.
projectId
=
projectId
;
}
public
String
getDriverName
()
{
return
driverName
;
}
public
void
setDriverName
(
String
driverName
)
{
this
.
driverName
=
driverName
;
}
public
String
getDataSourceTypeName
()
{
return
dataSourceTypeName
;
}
public
void
setDataSourceTypeName
(
String
dataSourceTypeName
)
{
this
.
dataSourceTypeName
=
dataSourceTypeName
;
}
public
Integer
getId
()
{
return
id
;
}
public
void
setId
(
Integer
id
)
{
this
.
id
=
id
;
}
public
String
getDatasourceName
()
{
return
datasourceName
;
}
public
void
setDatasourceName
(
String
datasourceName
)
{
this
.
datasourceName
=
datasourceName
;
}
public
String
getDatasourceDesc
()
{
return
datasourceDesc
;
}
public
void
setDatasourceDesc
(
String
datasourceDesc
)
{
this
.
datasourceDesc
=
datasourceDesc
;
}
}
src/main/java/com/jz/dmp/modules/model/RealTimeSyncModel.java
View file @
aa80f0d8
package
com
.
jz
.
dmp
.
modules
.
model
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
java.util.List
;
@ApiModel
(
value
=
"新增--新增选择源表返回参数"
,
description
=
"新增--新增选择源表返回参数"
)
public
class
RealTimeSyncModel
{
/**
*
数据源I
d
*
来源数据源i
d
*/
private
Integer
dataSourceId
;
@ApiModelProperty
(
value
=
"来源数据源id"
)
private
Integer
srcdatasourceId
;
/**
* 数据源名称
*
来源
数据源名称
*/
@ApiModelProperty
(
value
=
"来源数据源名称"
)
private
String
dataSourceName
;
/**
* 数据库名称
*/
@ApiModelProperty
(
value
=
"数据库名称"
)
private
String
dbName
;
/**
* 数据源下的表信息
*
来源
数据源下的表信息
*/
@ApiModelProperty
(
value
=
"来源数据源下的表信息"
)
private
List
<
RealTimeSyncTableModel
>
tables
;
public
Integer
getDataSourceId
()
{
return
dataSourceId
;
public
Integer
getSrcdatasourceId
()
{
return
srcdatasourceId
;
}
public
void
setDataSourceId
(
Integer
dataSourceId
)
{
this
.
dataSourceId
=
dataSourceId
;
public
void
setSrcdatasourceId
(
Integer
srcdatasourceId
)
{
this
.
srcdatasourceId
=
srcdatasourceId
;
}
public
String
getDataSourceName
()
{
return
dataSourceName
;
}
...
...
src/main/java/com/jz/dmp/modules/model/RealTimeSyncTableModel.java
View file @
aa80f0d8
package
com
.
jz
.
dmp
.
modules
.
model
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
java.io.Serializable
;
/**
* 实时同步表信息 表级别的信息
*/
public
class
RealTimeSyncTableModel
{
@ApiModel
(
value
=
"实时同步表级别的信息"
,
description
=
"实时同步表级别的信息"
)
public
class
RealTimeSyncTableModel
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
5882129191251102234L
;
/**
* 表名
*/
@ApiModelProperty
(
value
=
"表名"
)
private
String
tableName
;
/**
* 主键名称
*/
@ApiModelProperty
(
value
=
"主键名称"
)
private
String
pkName
;
/**
* 是否提交同步过同步
*/
private
boolean
i
s
Submited
;
@ApiModelProperty
(
value
=
"是否提交同步过同步"
)
private
boolean
i
f
Submited
;
/**
* 是否是黑名单
*/
private
boolean
isBlacklist
;
@ApiModelProperty
(
value
=
"是否是黑名单"
)
private
boolean
ifBlacklist
;
/**
* 脱敏字段
*/
@ApiModelProperty
(
value
=
"脱敏字段"
)
private
String
desensitizationField
;
/**
* 脱敏算法
*/
@ApiModelProperty
(
value
=
"脱敏算法"
)
private
String
arithmetic
;
/**
* 表下的所有列信息
*/
//private RealTimeSyncColumnModel columnInfo;
@ApiModelProperty
(
value
=
"表下的所有列信息"
)
private
RealTimeSyncColumnModel
columnInfo
;
public
String
getTableName
()
{
...
...
@@ -58,44 +71,43 @@ public class RealTimeSyncTableModel {
this
.
pkName
=
pkName
;
}
/*public RealTimeSyncColumnModel getColumnInfo
() {
return
columnInfo
;
public
String
getDesensitizationField
()
{
return
desensitizationField
;
}
public void setColumnInfo(RealTimeSyncColumnModel columnInfo) {
this.columnInfo = columnInfo;
}
*/
public
boolean
isSubmited
()
{
return
isSubmited
;
public
void
setDesensitizationField
(
String
desensitizationField
)
{
this
.
desensitizationField
=
desensitizationField
;
}
public
void
setIsSubmited
(
boolean
isSubmited
)
{
this
.
isSubmited
=
isSubmited
;
public
String
getArithmetic
(
)
{
return
arithmetic
;
}
public
boolean
isBlacklist
(
)
{
return
isBlacklist
;
public
void
setArithmetic
(
String
arithmetic
)
{
this
.
arithmetic
=
arithmetic
;
}
public
void
setIsBlacklist
(
boolean
isBlacklist
)
{
this
.
isBlacklist
=
isBlacklist
;
public
RealTimeSyncColumnModel
getColumnInfo
(
)
{
return
columnInfo
;
}
public
String
getDesensitizationField
(
)
{
return
desensitizationField
;
public
void
setColumnInfo
(
RealTimeSyncColumnModel
columnInfo
)
{
this
.
columnInfo
=
columnInfo
;
}
public
void
setDesensitizationField
(
String
desensitizationField
)
{
this
.
desensitizationField
=
desensitizationFiel
d
;
public
boolean
isIfSubmited
(
)
{
return
ifSubmite
d
;
}
public
String
getArithmetic
(
)
{
return
arithmetic
;
public
void
setIfSubmited
(
boolean
ifSubmited
)
{
this
.
ifSubmited
=
ifSubmited
;
}
public
void
setArithmetic
(
String
arithmetic
)
{
this
.
arithmetic
=
arithmetic
;
public
boolean
isIfBlacklist
(
)
{
return
ifBlacklist
;
}
public
void
setIfBlacklist
(
boolean
ifBlacklist
)
{
this
.
ifBlacklist
=
ifBlacklist
;
}
}
src/main/java/com/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
View file @
aa80f0d8
...
...
@@ -7,6 +7,7 @@ import com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeTableInfoReq
;
import
com.jz.dmp.modules.model.DmpRealtimeSyncInfo
;
import
com.jz.dmp.modules.model.RealTimeSyncModel
;
import
java.util.List
;
...
...
@@ -83,11 +84,20 @@ public interface DmpRealtimeSyncInfoService {
JsonResult
<
List
<
DataSourceNameListDto
>>
queryDatasourceNameList
(
String
projectId
,
String
type
)
throws
Exception
;
/**
*
根据数据源id获取表详细
信息
*
新增--选择源表
信息
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
JsonResult
queryTableInfoByParams
(
RealTimeTableInfoReq
req
)
throws
Exception
;
JsonResult
<
RealTimeSyncModel
>
queryTableInfoByParams
(
RealTimeTableInfoReq
req
)
throws
Exception
;
/**
* 新增--选择目标信息
*
* @return
* @author Bellamy
* @since 2021-01-07
*/
JsonResult
selectTargetDatasourceInfo
(
String
projectId
)
throws
Exception
;
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
View file @
aa80f0d8
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
com.github.pagehelper.PageHelper
;
import
com.github.pagehelper.PageInfo
;
import
com.jz.common.constant.JsonResult
;
import
com.jz.common.constant.ResultCode
;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.common.persistence.BaseService
;
import
com.jz.common.utils.realTime.DBUtil
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceNameListDto
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListDto
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeSyncListReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.RealTimeTableInfoReq
;
import
com.jz.dmp.modules.dao.DmpProjectDao
;
import
com.jz.dmp.modules.dao.DmpRealtimeSyncInfoDao
;
import
com.jz.dmp.modules.model.
DmpRealtimeSyncInfo
;
import
com.jz.dmp.modules.model.
*
;
import
com.jz.dmp.modules.service.DmpRealtimeSyncInfoService
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Propagation
;
import
org.springframework.transaction.annotation.Transactional
;
...
...
@@ -35,9 +38,15 @@ import java.util.regex.Pattern;
@Transactional
public
class
DmpRealtimeSyncInfoServiceImpl
implements
DmpRealtimeSyncInfoService
{
@Value
(
"${spring.public-key}"
)
private
String
publicKey
;
@Autowired
private
DmpRealtimeSyncInfoDao
dmpRealtimeSyncInfoDao
;
@Autowired
private
DmpProjectDao
dmpProjectDao
;
/**
* 通过ID查询单条数据
*
...
...
@@ -159,19 +168,19 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
/**
*
根据数据源id获取表详细
信息
*
新增--选择源表
信息
*
* @return
* @author Bellamy
* @since 2021-01-06
*/
@Override
public
JsonResult
queryTableInfoByParams
(
RealTimeTableInfoReq
req
)
throws
Exception
{
public
JsonResult
<
RealTimeSyncModel
>
queryTableInfoByParams
(
RealTimeTableInfoReq
req
)
throws
Exception
{
Map
<
String
,
Map
<
String
,
String
>>
paramsMap
=
null
;
//根据数据源id获取数据信息
Map
sourceDbInfo
=
dmpRealtimeSyncInfoDao
.
querygSourceDbInfoById
(
req
.
getSrcDatasourceId
());
//数据源对应的表详细信息
//根据
来源
数据源id获取数据信息
RealTimeSyncDataSourceModel
sourceDbInfo
=
dmpRealtimeSyncInfoDao
.
querygSourceDbInfoById
(
req
.
getSrcDatasourceId
());
sourceDbInfo
.
setPassword
(
new
BaseService
().
decode
(
sourceDbInfo
.
getPassword
(),
publicKey
));
//数据源对应的表详细信息
判断表的记录在同步任务中是否存在记录,如果已经存在,说明同步过
List
<
Map
>
list
=
dmpRealtimeSyncInfoDao
.
queryRealTimeInfoByDataSourceId
(
req
.
getSrcDatasourceId
(),
req
.
getTargetDatasourceId
());
if
(
list
.
size
()
>
0
&&
list
!=
null
)
{
paramsMap
=
new
HashMap
<>();
...
...
@@ -181,7 +190,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
}
Map
<
String
,
String
>
blackMapSetting
=
null
;
//查询源数据源的黑名单表
//查询
来
源数据源的黑名单表
Map
blackTableMap
=
dmpRealtimeSyncInfoDao
.
queryBlackTableByDataSourceId
(
req
.
getSrcDatasourceId
());
if
(
blackTableMap
.
size
()
>
0
&&
blackTableMap
!=
null
)
{
String
blacklistTable
=
blackTableMap
.
get
(
"blacklistTable"
).
toString
();
...
...
@@ -195,7 +204,29 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
}
}
}
//DBUtil.getDataSourceTables(sourceDbInfo,req.getToQueryTableName(),true,paramsMap,req.getBlacklistTables(),req.getSelectTablesName(),blackMapSetting);
return
null
;
//来源数据源下的表信息
List
<
RealTimeSyncTableModel
>
dataSourceTables
=
DBUtil
.
getDataSourceTables
(
sourceDbInfo
,
req
.
getToQueryTableName
(),
true
,
paramsMap
,
req
.
getBlacklistTables
(),
req
.
getSelectTablesName
(),
blackMapSetting
);
RealTimeSyncModel
realTimeSyncModel
=
new
RealTimeSyncModel
();
realTimeSyncModel
.
setSrcdatasourceId
(
Integer
.
valueOf
(
req
.
getSrcDatasourceId
()));
realTimeSyncModel
.
setDataSourceName
(
sourceDbInfo
.
getDatasourceName
());
realTimeSyncModel
.
setDbName
(
sourceDbInfo
.
getDbName
());
realTimeSyncModel
.
setTables
(
dataSourceTables
);
//来源数据源下的表信息
return
new
JsonResult
(
realTimeSyncModel
);
}
/**
* 新增--选择目标信息
*
* @return
* @author Bellamy
* @since 2021-01-07
*/
@Override
public
JsonResult
selectTargetDatasourceInfo
(
String
projectId
)
throws
Exception
{
DmpProjectSystemInfo
dmpProjectSystemInfo
=
dmpProjectDao
.
queryProjectSystemInfo
(
Long
.
valueOf
(
projectId
));
String
kafkaConnectUrl
=
dmpProjectSystemInfo
.
getKafkaConnectorUrl
();
//kafka 连接信息
String
[]
arr
=
kafkaConnectUrl
.
split
(
","
);
return
new
JsonResult
(
arr
);
}
}
\ No newline at end of file
src/main/resources/mapper/dmp/DmpProjectMapper.xml
View file @
aa80f0d8
...
...
@@ -459,54 +459,54 @@
<select
id=
"queryProjectSystemInfo"
parameterType=
"java.lang.Long"
resultType=
"com.jz.dmp.modules.model.DmpProjectSystemInfo"
>
SELECT
ID,
PROJECT_ID,
KERBEROS_ISENABLE,
KERBEROS_KRB5_CONF,
KERBEROS_JAAS_CONF,
KERBEROS_KEYTAB_CONF,
KERBEROS_KEYTAB_USER,
KERBEROS_FQDN,
KERBEROS_SPARK_JAAS_CONF,
KERBEROS_JAAS_CLIENT_NAME,
SHELL_CMD_SERVER,
SHELL_CMD_USER,
SHELL_CMD_PASSWORD,
SHELL_CMD_SUBMIT_SYCNING,
SHELL_CMD_QUERY_STATUS,
SHELL_CMD_STOP_SYCNING,
SHELL_CMD_CAT_LOG,
SHELL_FTP_DOWNLOAD_DIR,
HDFS_HTTP_PATH,
HDFS_USER_NAME,
HDFS_SYNCING_PATH,
IMPALA_JDBC_URL,
IMPALA_SHELL_URL,
AZKABAN_MONITOR_URL,
AZKABAN_EXECTOR_SHELL_EXEC,
AZKABAN_EXECTOR_SQL_EXEC,
AZKABAN_EXECTOR_XML_EXEC,
AZKABAN_EXECTOR_SQL_PATH,
AZKABAN_EXECTOR_SHELL_PATH,
AZKABAN_EXECTOR_SHELL_EXPORT_DATA,
AZKABAN_LOCAL_TASK_FILE_PATH,
KAFKA_CONNECTOR_URL,
KAFKA_BOOTSTRAP_SERVERS,
KAFKA_MONITOR_URL,
KAFKA_OFFSET_UPDATE_SHELL,
KAFKA_SCHEMA_REGISTER_URL,
KAFKA_INFLUX_URL,
KAFKA_INFLUX_USER_NAME,
KAFKA_INFLUX_PASSWORD,
SHELL_SFTP_PORT,
SPARK_HIVE_METASTORE_URIS,
SPARK_YARN_QUEUE,
SPARK_DEFAULT_EXECUTOR_MEMORY,
SPARK_DEFAULT_EXECUTOR_CORES,
SPARK_DEFAULT_TOTAL_EXECUTOR_CORES,
SPARK_STATISTICS_SOURCE_DATA_SWITCH,
ATLAS_MONITOR_URL,
OPENAPI_INTERFACE_URL
ID,
PROJECT_ID,
KERBEROS_ISENABLE,
KERBEROS_KRB5_CONF,
KERBEROS_JAAS_CONF,
KERBEROS_KEYTAB_CONF,
KERBEROS_KEYTAB_USER,
KERBEROS_FQDN,
KERBEROS_SPARK_JAAS_CONF,
KERBEROS_JAAS_CLIENT_NAME,
SHELL_CMD_SERVER,
SHELL_CMD_USER,
SHELL_CMD_PASSWORD,
SHELL_CMD_SUBMIT_SYCNING,
SHELL_CMD_QUERY_STATUS,
SHELL_CMD_STOP_SYCNING,
SHELL_CMD_CAT_LOG,
SHELL_FTP_DOWNLOAD_DIR,
HDFS_HTTP_PATH,
HDFS_USER_NAME,
HDFS_SYNCING_PATH,
IMPALA_JDBC_URL,
IMPALA_SHELL_URL,
AZKABAN_MONITOR_URL,
AZKABAN_EXECTOR_SHELL_EXEC,
AZKABAN_EXECTOR_SQL_EXEC,
AZKABAN_EXECTOR_XML_EXEC,
AZKABAN_EXECTOR_SQL_PATH,
AZKABAN_EXECTOR_SHELL_PATH,
AZKABAN_EXECTOR_SHELL_EXPORT_DATA,
AZKABAN_LOCAL_TASK_FILE_PATH,
KAFKA_CONNECTOR_URL,
KAFKA_BOOTSTRAP_SERVERS,
KAFKA_MONITOR_URL,
KAFKA_OFFSET_UPDATE_SHELL,
KAFKA_SCHEMA_REGISTER_URL,
KAFKA_INFLUX_URL,
KAFKA_INFLUX_USER_NAME,
KAFKA_INFLUX_PASSWORD,
SHELL_SFTP_PORT,
SPARK_HIVE_METASTORE_URIS,
SPARK_YARN_QUEUE,
SPARK_DEFAULT_EXECUTOR_MEMORY,
SPARK_DEFAULT_EXECUTOR_CORES,
SPARK_DEFAULT_TOTAL_EXECUTOR_CORES,
SPARK_STATISTICS_SOURCE_DATA_SWITCH,
ATLAS_MONITOR_URL,
OPENAPI_INTERFACE_URL
FROM dmp_project_system_info
WHERE data_status = '1' and PROJECT_ID = #{projectId}
</select>
...
...
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
View file @
aa80f0d8
...
...
@@ -364,22 +364,23 @@
</select>
<!--根据数据源id获取数据信息-->
<select
id=
"querygSourceDbInfoById"
parameterType=
"map"
resultType=
"
map
"
>
<select
id=
"querygSourceDbInfoById"
parameterType=
"map"
resultType=
"
com.jz.dmp.modules.model.RealTimeSyncDataSourceModel
"
>
select
ds.id as id,
ds.datasource_name as datasourceName,
ds.jdbc_url as jdbcUrl,
ds.user_name as userName,
ds.password,
ds.db_name as dbName,
dsdt.datasource as datasourceTypeName,
dsdt.driver_class_name as driverClassName,
dsdt.datasource,
dsdt.datasource_type as datasourceTypeName,
dsdt.driver_class_name as driverName
from dmp_syncing_datasource ds
inner join dmp_syncing_datasource_type dsdt on ds.datasource_type = dsdt.id
where 1=1 and ds.data_status = '1' dsd.ID = #{srcDataSourceId}
where 1=1 and ds.data_status = '1'
and ds.ID = #{srcDataSourceId}
</select>
<select
id=
"query
ListById11
"
resultType=
"java.util.Map"
>
<select
id=
"query
RealTimeInfoByDataSourceId
"
resultType=
"java.util.Map"
>
select
src_table_name as srcTableName,
connector_job_id as connectorJobId,
...
...
@@ -387,8 +388,9 @@
arithmetic
from dmp_realtime_sync_info
where 1=1 and type =2
<if
test=
"targetDataSourceId != null"
>
and target_datasource_id = #{targetDataSourceId}
</if>
<if
test=
"srcTableName != null"
>
and src_table_name = #{srcTableName}
</if>
and src_datasource_id = #{srcDatasourceId}
<if
test=
"targetDatasourceId != null"
>
and target_datasource_id = #{targetDatasourceId}
</if>
<!--<if test="srcTableName != null"> and src_table_name = #{srcTableName} </if>-->
</select>
<!-- 查询源数据源的黑名单表 -->
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment