Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
527af8d5
Commit
527af8d5
authored
Jan 08, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
no message
parent
268a9dc8
Changes
12
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
1031 additions
and
5 deletions
+1031
-5
RestClient.java
src/main/java/com/jz/common/utils/realTime/RestClient.java
+165
-0
RealTimeSyncController.java
...es/controller/DataIntegration/RealTimeSyncController.java
+42
-1
DmpProjectDao.java
src/main/java/com/jz/dmp/modules/dao/DmpProjectDao.java
+7
-0
DmpRealtimeSyncHandleCountDao.java
...com/jz/dmp/modules/dao/DmpRealtimeSyncHandleCountDao.java
+92
-0
DmpRealtimeSyncInfoDao.java
...n/java/com/jz/dmp/modules/dao/DmpRealtimeSyncInfoDao.java
+2
-0
DmpRealtimeSyncHandleCount.java
.../com/jz/dmp/modules/model/DmpRealtimeSyncHandleCount.java
+106
-0
DmpRealtimeSyncSubmitResult.java
...com/jz/dmp/modules/model/DmpRealtimeSyncSubmitResult.java
+97
-0
DmpRealtimeSyncInfoService.java
...om/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
+10
-0
DmpRealtimeSyncInfoServiceImpl.java
.../modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
+290
-3
DmpSyncingDatasourceServiceImpl.java
...modules/service/impl/DmpSyncingDatasourceServiceImpl.java
+69
-1
DmpRealtimeSyncHandleCountDao.xml
...in/resources/mapper/dmp/DmpRealtimeSyncHandleCountDao.xml
+118
-0
DmpRealtimeSyncInfoMapper.xml
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
+33
-0
No files found.
src/main/java/com/jz/common/utils/realTime/RestClient.java
0 → 100644
View file @
527af8d5
package
com
.
jz
.
common
.
utils
.
realTime
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.http.*
;
import
org.springframework.http.client.HttpComponentsClientHttpRequestFactory
;
import
org.springframework.util.MimeType
;
import
org.springframework.util.MimeTypeUtils
;
import
org.springframework.web.client.RestTemplate
;
import
java.nio.charset.Charset
;
import
java.util.Map
;
/**
* 使用RestTemplate发送HTTP请求
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
public
class
RestClient
{
public
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
RestClient
.
class
);
/**
*
* @param connetionTimeOut
* @param socketTimeOut
* @return
*/
public
static
RestTemplate
getRestTemplate
()
{
HttpComponentsClientHttpRequestFactory
requestFactory
=
new
HttpComponentsClientHttpRequestFactory
();
requestFactory
.
setConnectTimeout
(
120
*
1000
);
requestFactory
.
setReadTimeout
(
480
*
1000
);
RestTemplate
restTemplate
=
new
RestTemplate
(
requestFactory
);
return
restTemplate
;
}
/**
* 使用RestTemplate发送HTTP POST请求
* @param url 请求地址
* @param jsonStr 请求参数
* @param connectionTimeout 连接超时时间
* @param socketTimeout socket超时时间
* @return Map 返回报文
*/
@SuppressWarnings
({
"unchecked"
})
public
static
Map
<
String
,
Object
>
post
(
String
url
,
String
jsonStr
){
HttpHeaders
headers
=
new
HttpHeaders
();
headers
.
set
(
"Content-Type"
,
"application/json;charset=UTF-8"
);
//解决请求乱码问题
Map
<
String
,
Object
>
resutMap
=
null
;
try
{
RestTemplate
restTemplate
=
getRestTemplate
();
resutMap
=
restTemplate
.
postForObject
(
url
,
new
HttpEntity
<
String
>(
jsonStr
,
headers
),
Map
.
class
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
LOGGER
.
error
(
"rest post 异常"
,
e
.
getMessage
(),
e
);
}
return
resutMap
;
}
/**
* 使用RestTemplate发送HTTP get请求
* @param url
* @return
*/
@SuppressWarnings
(
"unchecked"
)
public
static
Map
<
String
,
Object
>
get
(
String
url
){
RestTemplate
restTemplate
=
getRestTemplate
();
Map
<
String
,
Object
>
map
=
null
;
try
{
map
=
restTemplate
.
getForObject
(
url
,
Map
.
class
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"rest get 异常"
,
e
.
getMessage
(),
e
);
}
return
map
;
}
/**
* 使用RestTemplate发送HTTP put请求
* @param url
* @param jsonStr
*/
public
static
Map
<
String
,
Object
>
put
(
String
url
,
String
jsonStr
){
Map
<
String
,
Object
>
map
=
null
;
try
{
map
=
exchange
(
url
,
HttpMethod
.
PUT
,
jsonStr
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"rest put 异常"
,
e
.
getMessage
(),
e
);
}
return
map
;
}
/**
* 使用RestTemplate发送HTTP delete 请求
* @param url
* @param jsonStr
*/
public
static
void
delete
(
String
url
)
{
try
{
RestTemplate
restTemplate
=
getRestTemplate
();
restTemplate
.
delete
(
url
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"rest delete 异常"
,
e
.
getMessage
(),
e
);
}
}
@SuppressWarnings
({
"unchecked"
,
"rawtypes"
})
public
static
Map
<
String
,
Object
>
exchange
(
String
url
,
HttpMethod
method
,
String
jsonStr
)
{
// 请求头
HttpHeaders
headers
=
new
HttpHeaders
();
MimeType
mimeType
=
MimeTypeUtils
.
parseMimeType
(
"application/json"
);
MediaType
mediaType
=
new
MediaType
(
mimeType
.
getType
(),
mimeType
.
getSubtype
(),
Charset
.
forName
(
"UTF-8"
));
// 请求体
headers
.
setContentType
(
mediaType
);
// 发送请求
HttpEntity
<
String
>
entity
=
new
HttpEntity
<>(
jsonStr
,
headers
);
ResponseEntity
<
Map
>
resultEntity
=
getRestTemplate
().
exchange
(
url
,
method
,
entity
,
Map
.
class
);
return
resultEntity
.
getBody
();
}
public
static
void
main
(
String
[]
args
)
{
String
url
=
"http://10.0.53.177:9883/connectors"
;
post
(
url
,
jsonStr
);
}
static
String
jsonStr
=
"{\r\n"
+
" \"name\": \"debezium-connector-avtiviti-process-center-1\",\r\n"
+
" \"config\": {\r\n"
+
" \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\",\r\n"
+
" \"database.history.producer.sasl.kerberos.service.name\": \"kafka\",\r\n"
+
" \"tasks.max\": \"3\",\r\n"
+
" \"database.history.kafka.topic\": \"avtiviti.process-center.databasehistory\",\r\n"
+
" \"column.blacklist\": \"\",\r\n"
+
" \"transforms\": \"unwrap\",\r\n"
+
" \"database.history.consumer.security.protocol\": \"SASL_PLAINTEXT\",\r\n"
+
" \"database.history.consumer.sasl.kerberos.service.name\": \"kafka\",\r\n"
+
" \"table.blacklist\": \"\",\r\n"
+
" \"transforms.unwrap.drop.tombstones\": \"true\",\r\n"
+
" \"transforms.unwrap.type\": \"io.debezium.transforms.UnwrapFromEnvelope\",\r\n"
+
" \"value.converter\": \"io.confluent.connect.avro.AvroConverter\",\r\n"
+
" \"database.whitelist\": \"process-center\",\r\n"
+
" \"key.converter\": \"io.confluent.connect.avro.AvroConverter\",\r\n"
+
" \"database.history.producer.sasl.mechanism\": \"GSSAPI\",\r\n"
+
" \"database.user\": \"root\",\r\n"
+
" \"database.server.id\": \"20\",\r\n"
+
" \"database.history.producer.security.protocol\": \"SASL_PLAINTEXT\",\r\n"
+
" \"database.history.kafka.bootstrap.servers\": \"SASL_PLAINTEXT://10.0.108.61:9092,SASL_PLAINTEXT://10.0.108.62:9092,SASL_PLAINTEXT://10.0.108.63:9092\",\r\n"
+
" \"database.server.name\": \"avtiviti\",\r\n"
+
" \"database.port\": \"3306\",\r\n"
+
" \"value.converter.schema.registry.url\": \"http://10.0.108.61:9881\",\r\n"
+
" \"database.hostname\": \"10.0.53.179\",\r\n"
+
" \"database.password\": \"jz@2018\",\r\n"
+
" \"database.history.consumer.sasl.mechanism\": \"GSSAPI\",\r\n"
+
" \"key.converter.schema.registry.url\": \"http://10.0.108.61:9881\"\r\n"
+
" }\r\n"
+
"}"
;
}
src/main/java/com/jz/dmp/modules/controller/DataIntegration/RealTimeSyncController.java
View file @
527af8d5
...
...
@@ -15,6 +15,7 @@ import io.swagger.annotations.Api;
import
io.swagger.annotations.ApiImplicitParam
;
import
io.swagger.annotations.ApiImplicitParams
;
import
io.swagger.annotations.ApiOperation
;
import
lombok.SneakyThrows
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
...
...
@@ -24,6 +25,7 @@ import org.springframework.web.bind.annotation.*;
import
javax.servlet.http.HttpServletRequest
;
import
java.util.List
;
import
java.util.Map
;
/**
* 实时同步任务
...
...
@@ -92,7 +94,8 @@ public class RealTimeSyncController {
System
.
out
.
println
(
srcTopicName
);
logger
.
info
(
"############正常执行表数据id........"
+
ids
[
i
]);
String
shellPath
=
"/app/bigdata-app/scripts/trigger_straming.sh"
;
CmdUtils
.
callShell
(
shellPath
,
srcTopicName
);
boolean
flag
=
CmdUtils
.
callShell
(
shellPath
,
srcTopicName
);
logger
.
info
(
"############"
+
flag
);
}
}
return
new
JsonResult
();
...
...
@@ -205,4 +208,42 @@ public class RealTimeSyncController {
return
jsonResult
;
}
/**
* 保存实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
@ApiOperation
(
value
=
"保存实时同步任务"
,
notes
=
"保存实时同步任务"
)
@PostMapping
(
value
=
"/addTask"
)
public
JsonResult
addTask
(
@RequestBody
Map
<
String
,
Object
>
params
,
HttpServletRequest
httpRequest
)
throws
Exception
{
logger
.
info
(
"###################请求参数{}"
+
params
.
toString
()
+
"############"
);
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"projectId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"项目id不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"srcDataSourceId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"来源数据源id不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
params
.
get
(
"targetDataSourceId"
).
toString
()))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"目标数据源id不能为空!"
);
}
boolean
flag
=
false
;
//异步提交
Thread
thread
=
new
Thread
(
new
Runnable
()
{
@Override
public
void
run
()
{
try
{
dmpRealtimeSyncInfoService
.
addRealTimeTask
(
params
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
});
thread
.
start
();
return
new
JsonResult
();
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/dao/DmpProjectDao.java
View file @
527af8d5
...
...
@@ -29,5 +29,12 @@ public interface DmpProjectDao extends CrudDao<DmpProject> {
public
List
<
DmpProjectSystemInfo
>
getProjectSystemInfo
(
Long
projectId
);
/**
* 根据projectId查询项目系统配置信息
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
DmpProjectSystemInfo
queryProjectSystemInfo
(
Long
projectId
);
}
src/main/java/com/jz/dmp/modules/dao/DmpRealtimeSyncHandleCountDao.java
0 → 100644
View file @
527af8d5
package
com
.
jz
.
dmp
.
modules
.
dao
;
import
com.jz.dmp.modules.model.DmpRealtimeSyncHandleCount
;
import
com.jz.dmp.modules.model.DmpRealtimeSyncSubmitResult
;
import
org.apache.ibatis.annotations.Param
;
import
java.util.List
;
/**
* 实时同步任务处理数(DmpRealtimeSyncHandleCount)表数据库访问层
*
* @author Bellamy
* @since 2021-01-08 14:56:03
*/
public
interface
DmpRealtimeSyncHandleCountDao
{
/**
* 通过ID查询单条数据
*
* @param uuid
* @return 实例对象
*/
DmpRealtimeSyncHandleCount
queryById
(
@Param
(
"uuid"
)
String
uuid
);
/**
* 查询指定行数据
*
* @param offset 查询起始位置
* @param limit 查询条数
* @return 对象列表
*/
List
<
DmpRealtimeSyncHandleCount
>
queryAllByLimit
(
@Param
(
"offset"
)
int
offset
,
@Param
(
"limit"
)
int
limit
);
/**
* 通过实体作为筛选条件查询
*
* @param dmpRealtimeSyncHandleCount 实例对象
* @return 对象列表
*/
List
<
DmpRealtimeSyncHandleCount
>
queryAll
(
DmpRealtimeSyncHandleCount
dmpRealtimeSyncHandleCount
);
/**
* 新增数据
*
* @param dmpRealtimeSyncHandleCount 实例对象
* @return 影响行数
*/
int
insert
(
DmpRealtimeSyncHandleCount
dmpRealtimeSyncHandleCount
);
/**
* 批量新增数据(MyBatis原生foreach方法)
*
* @param entities List<DmpRealtimeSyncHandleCount> 实例对象列表
* @return 影响行数
*/
int
insertBatch
(
@Param
(
"entities"
)
List
<
DmpRealtimeSyncHandleCount
>
entities
);
/**
* 批量新增或按主键更新数据(MyBatis原生foreach方法)
*
* @param entities List<DmpRealtimeSyncHandleCount> 实例对象列表
* @return 影响行数
*/
int
insertOrUpdateBatch
(
@Param
(
"entities"
)
List
<
DmpRealtimeSyncHandleCount
>
entities
);
/**
* 修改数据
*
* @param dmpRealtimeSyncHandleCount 实例对象
* @return 影响行数
*/
int
update
(
DmpRealtimeSyncHandleCount
dmpRealtimeSyncHandleCount
);
/**
* 通过主键删除数据
*
* @param 主键
* @return 影响行数
*/
int
deleteById
();
/**
* 保存实时同步任务提交结果
*
* @param dmpRealtimeSyncSubmitResult
* @return 影响行数
* @author Bellamy
* @since 2021-01-07
*/
int
insertRealtimeSyncSubmitResult
(
DmpRealtimeSyncSubmitResult
dmpRealtimeSyncSubmitResult
);
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/dao/DmpRealtimeSyncInfoDao.java
View file @
527af8d5
...
...
@@ -129,4 +129,6 @@ public interface DmpRealtimeSyncInfoDao {
* @since 2021-01-06
*/
Map
queryBlackTableByDataSourceId
(
@Param
(
"srcDatasourceId"
)
String
srcDatasourceId
);
Map
selecltRealtimeSyncInfoByParams
(
Map
<
String
,
Object
>
queryParam
);
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/model/DmpRealtimeSyncHandleCount.java
0 → 100644
View file @
527af8d5
package
com
.
jz
.
dmp
.
modules
.
model
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
java.io.Serializable
;
import
java.util.Date
;
/**
* 实时同步任务处理数(DmpRealtimeSyncHandleCount)实体类
*
* @author Bellamy
* @since 2021-01-08 14:56:02
*/
@ApiModel
(
value
=
"实时同步任务处理数"
,
description
=
"实时同步任务处理数"
)
public
class
DmpRealtimeSyncHandleCount
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
29396966088167386L
;
@ApiModelProperty
(
value
=
"handleCountId"
)
private
Integer
handleCountId
;
@ApiModelProperty
(
value
=
"uuid"
)
private
String
uuid
;
/**
* 待提交数
*/
@ApiModelProperty
(
value
=
"待提交数"
)
private
Integer
toSubmit
;
/**
* 提交中的数
*/
@ApiModelProperty
(
value
=
"提交中的数"
)
private
Integer
submiting
;
/**
* 提交成功的数
*/
@ApiModelProperty
(
value
=
"提交成功的数"
)
private
Integer
submitSuccess
;
/**
* 提交失败的数
*/
@ApiModelProperty
(
value
=
"提交失败的数"
)
private
Integer
submitFail
;
/**
* 创建时间
*/
@ApiModelProperty
(
value
=
"创建时间"
)
private
Date
creTime
;
public
String
getUuid
()
{
return
uuid
;
}
public
void
setUuid
(
String
uuid
)
{
this
.
uuid
=
uuid
;
}
public
Integer
getToSubmit
()
{
return
toSubmit
;
}
public
void
setToSubmit
(
Integer
toSubmit
)
{
this
.
toSubmit
=
toSubmit
;
}
public
Integer
getSubmiting
()
{
return
submiting
;
}
public
void
setSubmiting
(
Integer
submiting
)
{
this
.
submiting
=
submiting
;
}
public
Integer
getSubmitSuccess
()
{
return
submitSuccess
;
}
public
void
setSubmitSuccess
(
Integer
submitSuccess
)
{
this
.
submitSuccess
=
submitSuccess
;
}
public
Integer
getSubmitFail
()
{
return
submitFail
;
}
public
void
setSubmitFail
(
Integer
submitFail
)
{
this
.
submitFail
=
submitFail
;
}
public
Date
getCreTime
()
{
return
creTime
;
}
public
void
setCreTime
(
Date
creTime
)
{
this
.
creTime
=
creTime
;
}
public
Integer
getHandleCountId
()
{
return
handleCountId
;
}
public
void
setHandleCountId
(
Integer
handleCountId
)
{
this
.
handleCountId
=
handleCountId
;
}
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/model/DmpRealtimeSyncSubmitResult.java
0 → 100644
View file @
527af8d5
package
com
.
jz
.
dmp
.
modules
.
model
;
import
java.io.Serializable
;
import
java.util.Date
;
/**
* 实时同步任务提交结果
*
* @author Bellamy
* @since 2021-01-05 14:18:00
*/
public
class
DmpRealtimeSyncSubmitResult
implements
Serializable
{
private
static
final
long
serialVersionUID
=
2075639970822903705L
;
private
Long
submitResultId
;
private
String
uuid
;
/**
* 提交结果
*/
private
String
submitResult
;
/**
* source:源 connector,tartget:目标connector
*/
private
Integer
type
;
/**
* 创建时间
*/
private
Date
createdTime
;
private
String
typeStr
;
/**
* 处理状态:1 待提交,2:提交中,3:提交成功,4:提交失败
*/
private
Integer
status
;
public
String
getUuid
()
{
return
uuid
;
}
public
void
setUuid
(
String
uuid
)
{
this
.
uuid
=
uuid
;
}
public
String
getSubmitResult
()
{
return
submitResult
;
}
public
void
setSubmitResult
(
String
submitResult
)
{
this
.
submitResult
=
submitResult
;
}
public
Integer
getType
()
{
return
type
;
}
public
void
setType
(
Integer
type
)
{
this
.
type
=
type
;
}
public
Integer
getStatus
()
{
return
status
;
}
public
void
setStatus
(
Integer
status
)
{
this
.
status
=
status
;
}
public
String
getTypeStr
()
{
return
typeStr
;
}
public
void
setTypeStr
(
String
typeStr
)
{
this
.
typeStr
=
typeStr
;
}
public
Long
getSubmitResultId
()
{
return
submitResultId
;
}
public
void
setSubmitResultId
(
Long
submitResultId
)
{
this
.
submitResultId
=
submitResultId
;
}
public
Date
getCreatedTime
()
{
return
createdTime
;
}
public
void
setCreatedTime
(
Date
createdTime
)
{
this
.
createdTime
=
createdTime
;
}
}
src/main/java/com/jz/dmp/modules/service/DmpRealtimeSyncInfoService.java
View file @
527af8d5
...
...
@@ -10,6 +10,7 @@ import com.jz.dmp.modules.model.DmpRealtimeSyncInfo;
import
com.jz.dmp.modules.model.RealTimeSyncModel
;
import
java.util.List
;
import
java.util.Map
;
/**
* 实时同步任务(DmpRealtimeSyncInfo)表服务接口
...
...
@@ -102,4 +103,13 @@ public interface DmpRealtimeSyncInfoService {
JsonResult
selectTargetDatasourceInfo
(
String
projectId
)
throws
Exception
;
JsonResult
queryRealTimeInfoByDataSourceId
(
String
srcDatasourceId
,
String
sourceTableName
)
throws
Exception
;
/**
* 保存实时同步任务
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
boolean
addRealTimeTask
(
Map
<
String
,
Object
>
params
)
throws
Exception
;
}
\ No newline at end of file
src/main/java/com/jz/dmp/modules/service/impl/DmpRealtimeSyncInfoServiceImpl.java
View file @
527af8d5
This diff is collapsed.
Click to expand it.
src/main/java/com/jz/dmp/modules/service/impl/DmpSyncingDatasourceServiceImpl.java
View file @
527af8d5
...
...
@@ -9,6 +9,7 @@ import com.jz.common.constant.JsonResult;
import
com.jz.common.constant.ResultCode
;
import
com.jz.common.enums.TestConnectStatusEnum
;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.common.persistence.BaseService
;
import
com.jz.common.utils.JsonMapper
;
import
com.jz.dmp.agent.DmpAgentResult
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceListDto
;
...
...
@@ -28,6 +29,7 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Propagation
;
import
org.springframework.transaction.annotation.Transactional
;
...
...
@@ -47,6 +49,9 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
private
static
Logger
logger
=
LoggerFactory
.
getLogger
(
DmpSyncingDatasourceServiceImpl
.
class
);
@Value
(
"${spring.public-key}"
)
private
String
publicKey
;
@Autowired
private
DmpSyncingDatasourceDao
dmpSyncingDatasourceDao
;
...
...
@@ -84,6 +89,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
*
* @param dmpSyncingDatasource 实例对象
* @return 实例对象
* @author Bellamy
*/
@Override
public
DmpSyncingDatasource
insert
(
DmpSyncingDatasource
dmpSyncingDatasource
)
{
...
...
@@ -96,6 +102,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
*
* @param dmpSyncingDatasource 实例对象
* @return 实例对象
* @author Bellamy
*/
@Override
public
DmpSyncingDatasource
update
(
DmpSyncingDatasource
dmpSyncingDatasource
)
{
...
...
@@ -108,6 +115,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
*
* @param id 主键
* @return 是否成功
* @author Bellamy
*/
@Override
public
boolean
deleteById
(
Integer
id
)
{
...
...
@@ -118,6 +126,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 数据源列表查询
*
* @param req
* @author Bellamy
*/
@Override
public
PageInfoResponse
<
DataSourceListDto
>
queryDataSourceListPage
(
DataSourceListReq
req
,
HttpServletRequest
httpRequest
)
throws
Exception
{
...
...
@@ -138,6 +147,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 批量删除数据源
*
* @return
* @author Bellamy
*/
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRED
)
...
...
@@ -157,6 +167,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 新增-获取数据源类型
*
* @return
* @author Bellamy
*/
@Override
public
JsonResult
queryGroupDatasourceType
()
throws
Exception
{
...
...
@@ -186,6 +197,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 保存数据源
*
* @return
* @author Bellamy
*/
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
@Override
...
...
@@ -198,6 +210,9 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
if
(
len
>
0
)
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"数据源名称已存在"
);
}
//解析要保存的数据源信息
DmpSyncingDatasource
dmpSyncingDatasource
=
getDataSourceInfo
(
saveBody
);
DmpSyncingDatasource
dsd
=
new
DmpSyncingDatasource
();
BeanUtils
.
copyProperties
(
saveBody
,
dsd
);
...
...
@@ -205,15 +220,65 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
dsd
.
setDatasourceType
(
Integer
.
valueOf
(
saveBody
.
getDatasourceType
()));
//数据源类型ID
dsd
.
setProjectId
(
Integer
.
valueOf
(
saveBody
.
getProjectId
()));
dsd
.
setDataStatus
(
"1"
);
dsd
.
setTestConnectStatus
(
TestConnectStatusEnum
.
WCS
.
getValue
());
if
(
StringUtils
.
isEmpty
(
saveBody
.
getTestConnectStatus
()))
{
//默认未测试
dsd
.
setTestConnectStatus
(
TestConnectStatusEnum
.
WCS
.
getValue
());
}
dsd
.
setTestConnectStatus
(
saveBody
.
getTestConnectStatus
());
dsd
.
setDbName
(
dmpSyncingDatasource
.
getDbName
());
dsd
.
setHost
(
dmpSyncingDatasource
.
getHost
());
dsd
.
setPort
(
dmpSyncingDatasource
.
getPort
());
dsd
.
setPassword
(
dmpSyncingDatasource
.
getPassword
());
dmpSyncingDatasourceDao
.
insert
(
dsd
);
return
new
JsonResult
(
dsd
);
}
/**
* 解析要保存的数据源信息
*
* @return
* @author Bellamy
* @since 2021-01-08
*/
public
DmpSyncingDatasource
getDataSourceInfo
(
DmpSyncingDatasourceReq
saveBody
)
{
DmpSyncingDatasource
returnData
=
new
DmpSyncingDatasource
();
if
(
saveBody
!=
null
)
{
String
jdbcUrl
=
saveBody
.
getJdbcUrl
();
if
(!
org
.
springframework
.
util
.
StringUtils
.
isEmpty
(
jdbcUrl
))
{
jdbcUrl
=
jdbcUrl
.
substring
(
jdbcUrl
.
indexOf
(
"//"
)
+
2
);
String
hostPort
=
""
;
String
dbName
=
""
;
if
(
jdbcUrl
.
contains
(
"/"
))
{
hostPort
=
jdbcUrl
.
substring
(
0
,
jdbcUrl
.
indexOf
(
"/"
));
dbName
=
jdbcUrl
.
substring
(
jdbcUrl
.
indexOf
(
"/"
)
+
1
);
}
if
(
dbName
.
contains
(
"?"
))
{
dbName
=
dbName
.
substring
(
0
,
dbName
.
indexOf
(
"?"
));
//数据库名称
}
returnData
.
setDbName
(
dbName
);
String
[]
hostPortArr
=
hostPort
.
split
(
":"
);
if
(
hostPortArr
!=
null
&&
hostPortArr
.
length
>=
2
)
{
String
host
=
hostPortArr
[
0
];
String
port
=
hostPortArr
[
1
];
returnData
.
setHost
(
host
);
//ip
returnData
.
setPort
(
port
);
//端口
}
}
//加密密码
String
password
=
saveBody
.
getPassword
();
password
=
new
BaseService
().
encode
(
password
,
publicKey
);
returnData
.
setPassword
(
password
);
}
return
returnData
;
}
/**
* 测试连通性
*
* @return
* @author Bellamy
*/
@Override
public
JsonResult
testConnection
(
DmpSyncingDatasourceReq
saveBody
)
throws
Exception
{
...
...
@@ -238,6 +303,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 编辑数据源
*
* @return
* @author Bellamy
*/
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
...
...
@@ -274,6 +340,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
* 获取数据源类型输入框属性
*
* @return
* @author Bellamy
*/
@Override
public
JsonResult
<
SyncingDatasourceTypeDto
>
selectDatasourceTypeAttrById
(
String
datasourceTypeId
)
throws
Exception
{
...
...
@@ -311,6 +378,7 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
/*
* 查询数据源 对应的 数据库信息
* @author Bellamy
* */
private
DmpAgentDatasourceInfo
dsInfoDTO
(
DmpSyncingDatasourceReq
body
)
throws
Exception
{
//数据源类型ID去查询
...
...
src/main/resources/mapper/dmp/DmpRealtimeSyncHandleCountDao.xml
0 → 100644
View file @
527af8d5
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper
namespace=
"com.jz.dmp.modules.dao.DmpRealtimeSyncHandleCountDao"
>
<resultMap
type=
"com.jz.dmp.modules.model.DmpRealtimeSyncHandleCount"
id=
"DmpRealtimeSyncHandleCountMap"
>
<result
property=
"handleCountId"
column=
"handle_count_id"
jdbcType=
"INTEGER"
/>
<result
property=
"uuid"
column=
"uuid"
jdbcType=
"VARCHAR"
/>
<result
property=
"toSubmit"
column=
"to_submit"
jdbcType=
"INTEGER"
/>
<result
property=
"submiting"
column=
"submiting"
jdbcType=
"INTEGER"
/>
<result
property=
"submitSuccess"
column=
"submit_success"
jdbcType=
"INTEGER"
/>
<result
property=
"submitFail"
column=
"submit_fail"
jdbcType=
"INTEGER"
/>
<result
property=
"creTime"
column=
"cre_time"
jdbcType=
"TIMESTAMP"
/>
</resultMap>
<!--查询单个-->
<select
id=
"queryById"
resultMap=
"DmpRealtimeSyncHandleCountMap"
>
select
handle_count_id, uuid, to_submit, submiting, submit_success, submit_fail, cre_time
from dmp_realtime_sync_handle_count
where uuid= #{uuid}
</select>
<!--查询指定行数据-->
<select
id=
"queryAllByLimit"
resultMap=
"DmpRealtimeSyncHandleCountMap"
>
select
uuid, to_submit, submiting, submit_success, submit_fail, cre_time
from dmp_realtime_sync_handle_count
limit #{offset}, #{limit}
</select>
<!--通过实体作为筛选条件查询-->
<select
id=
"queryAll"
resultMap=
"DmpRealtimeSyncHandleCountMap"
>
select
uuid, to_submit, submiting, submit_success, submit_fail, cre_time
from dmp_realtime_sync_handle_count
<where>
<if
test=
"uuid != null and uuid != ''"
>
and uuid = #{uuid}
</if>
<if
test=
"toSubmit != null"
>
and to_submit = #{toSubmit}
</if>
<if
test=
"submiting != null"
>
and submiting = #{submiting}
</if>
<if
test=
"submitSuccess != null"
>
and submit_success = #{submitSuccess}
</if>
<if
test=
"submitFail != null"
>
and submit_fail = #{submitFail}
</if>
<if
test=
"creTime != null"
>
and cre_time = #{creTime}
</if>
</where>
</select>
<!--新增所有列-->
<insert
id=
"insert"
keyProperty=
"handleCountId"
useGeneratedKeys=
"true"
>
insert into dmp_realtime_sync_handle_count(uuid, to_submit, submiting, submit_success, submit_fail, cre_time)
values (#{uuid}, #{toSubmit}, #{submiting}, #{submitSuccess}, #{submitFail}, #{creTime})
</insert>
<insert
id=
"insertBatch"
keyProperty=
"handleCountId"
useGeneratedKeys=
"true"
>
insert into dmp_realtime_sync_handle_count(uuid, to_submit, submiting, submit_success, submit_fail,
cre_time)
values
<foreach
collection=
"entities"
item=
"entity"
separator=
","
>
(#{entity.uuid}, #{entity.toSubmit}, #{entity.submiting}, #{entity.submitSuccess}, #{entity.submitFail},
#{entity.creTime})
</foreach>
</insert>
<insert
id=
"insertOrUpdateBatch"
keyProperty=
"handleCountId"
useGeneratedKeys=
"true"
>
insert into dmp_realtime_sync_handle_count(uuid, to_submit, submiting, submit_success, submit_fail,
cre_time)
values
<foreach
collection=
"entities"
item=
"entity"
separator=
","
>
(#{entity.uuid}, #{entity.toSubmit}, #{entity.submiting}, #{entity.submitSuccess}, #{entity.submitFail},
#{entity.creTime})
</foreach>
on duplicate key update
uuid = values(uuid) , to_submit = values(to_submit) , submiting = values(submiting) , submit_success =
values(submit_success) , submit_fail = values(submit_fail) , cre_time = values(cre_time)
</insert>
<!--通过主键修改数据-->
<update
id=
"update"
>
update dmp_realtime_sync_handle_count
<set>
<if
test=
"toSubmit != null"
>
to_submit = #{toSubmit},
</if>
<if
test=
"submiting != null"
>
submiting = #{submiting},
</if>
<if
test=
"submitSuccess != null"
>
submit_success = #{submitSuccess},
</if>
<if
test=
"submitFail != null"
>
submit_fail = #{submitFail},
</if>
</set>
where uuid = #{uuid}
</update>
<!--通过主键删除-->
<delete
id=
"deleteById"
>
delete from dmp_realtime_sync_handle_count where = #{handleCountId}
</delete>
<!--保存实时同步任务提交结果-->
<insert
id=
"insertRealtimeSyncSubmitResult"
useGeneratedKeys=
"true"
keyProperty=
"submitResultId"
parameterType=
"com.jz.dmp.modules.model.DmpRealtimeSyncSubmitResult"
>
INSERT INTO dmp_realtime_sync_submit_result (`uuid`,`submit_result`,`type`,`created_time`,`status`)
VALUES(#{uuid},#{submitResult},#{type},createdTime,#{status})
</insert>
</mapper>
\ No newline at end of file
src/main/resources/mapper/dmp/DmpRealtimeSyncInfoMapper.xml
View file @
527af8d5
...
...
@@ -372,6 +372,8 @@
ds.user_name as userName,
ds.password,
ds.db_name as dbName,
ds.host,
ds.port,
dsdt.datasource_type as datasourceTypeName,
dsdt.driver_class_name as driverName
from dmp_syncing_datasource ds
...
...
@@ -401,4 +403,35 @@
FROM dmp_realtime_sync_blacklist_table_info
WHERE datasource_id = #{srcDatasourceId}
</select>
<select
id=
"selecltRealtimeSyncInfoByParams"
resultType=
"java.util.Map"
parameterType=
"java.util.Map"
>
SELECT
t1.id,
t1.tree_id as treeId,
t2.name as treeName,
t1.status,
date_format(t1.update_time,'%Y-%m-%d %H:%i:%s') as updateTime,
t1.src_datasource_id as srcDatasourceId,
t1.src_datasource_name as srcDatasourceName,
t1.src_database_type as srcDatabaseType,
t1.target_datasource_id as targetDatasourceId,
t1.target_datasource_name as targetDatasourceName,
t1.target_database_type as targetDatabaseType
FROM dmp_realtime_sync_info t1
left join dmp_navigation_tree t2 on t1.tree_id=t2.id
left join dmp_syncing_datasource t3 ON t1.src_datasource_id = t3.ID
left join dmp_syncing_datasource t4 ON t1.target_datasource_id = t4.ID
where 1=1
<if
test=
"projectId !=null"
>
and t1.project_id=#{projectId}
</if>
<if
test=
"taskStatus != null and taskStatus != '' "
>
AND t1.status = #{taskStatus}
</if>
<if
test=
"treeId != null and treeId != '' "
>
AND t1.id = #{treeId}
</if>
<if
test=
"targetDatabaseTypeId != null and targetDatabaseTypeId != '' "
>
AND t4.DATASOURCE_TYPE = #{targetDatabaseTypeId}
</if>
<if
test=
"sourceDatabaseTypeId != null and sourceDatabaseTypeId != ''"
>
AND t3.DATASOURCE_TYPE = #{sourceDatabaseTypeId}
</if>
<if
test=
"sourceDatabaseName != null and sourceDatabaseName != '' "
>
AND t1.src_datasource_name like CONCAT('%', #{sourceDatabaseName}, '%')
</if>
<if
test=
"targetDatabaseName != null and targetDatabaseName != '' "
>
AND t1.target_datasource_name like CONCAT('%', #{targetDatabaseName}, '%')
</if>
<if
test=
"treeName != null and treeName != '' "
>
AND t2.name like CONCAT('%', #{treeName}, '%')
</if>
<if
test=
"sourceConnectorName != null "
>
AND t1.connector_job_id = #{sourceConnectorName}
</if>
<if
test=
"srcDataSourceId != null"
>
AND t1.src_datasource_id = #{srcDataSourceId}
</if>
order by t1.create_time desc
</select>
</mapper>
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment