Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
aa39ad69
Commit
aa39ad69
authored
Feb 19, 2021
by
mcb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
离线任务数据预览
parent
5bf4ed26
Changes
7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
158 additions
and
10 deletions
+158
-10
DmpMysqlAgentClientService.java
...m/jz/agent/client/service/DmpMysqlAgentClientService.java
+1
-1
DmpDsAgentService.java
src/main/java/com/jz/agent/service/DmpDsAgentService.java
+2
-1
OfflineSynchController.java
...es/controller/DataIntegration/OfflineSynchController.java
+21
-0
OfflineDataPreview.java
...s/controller/DataIntegration/bean/OfflineDataPreview.java
+31
-0
OfflineSynchService.java
.../java/com/jz/dmp/modules/service/OfflineSynchService.java
+8
-0
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+83
-2
lxTaskJson.json
src/main/resources/templates/lxTaskJson.json
+12
-6
No files found.
src/main/java/com/jz/agent/client/service/DmpMysqlAgentClientService.java
View file @
aa39ad69
...
...
@@ -148,7 +148,7 @@ public class DmpMysqlAgentClientService implements DmpDsAgentClient {
try
{
conn
=
JDBCUtils
.
getConnections
(
ds
.
getDriverClassName
(),
ds
.
getJdbcUrl
(),
ds
.
getUserName
(),
ds
.
getPassword
());
String
sql
=
"select * from "
+
tableName
+
" limit 5"
;
String
sql2
=
"select * from information_schema.columns where table_name = '"
+
tableName
+
"'"
;
String
sql2
=
"select * from information_schema.columns where table_name = '"
+
tableName
+
"'
and table_schema = '"
+
ds
.
getDbName
()
+
"'
"
;
ps
=
conn
.
prepareStatement
(
sql2
);
rs
=
ps
.
executeQuery
();
while
(
rs
.
next
())
{
...
...
src/main/java/com/jz/agent/service/DmpDsAgentService.java
View file @
aa39ad69
...
...
@@ -15,6 +15,7 @@ public interface DmpDsAgentService {
//获取数据源表字段
public
DmpAgentResult
getTableColumnList
(
DmpAgentDatasourceInfo
ds
,
String
tableName
);
//获取表数据预览
public
DmpAgentResult
previewData
(
DmpAgentDatasourceInfo
ds
,
String
tableName
);
public
DmpAgentResult
submitSycingXmlConfig
(
DmpAgentDatasourceInfo
ds
,
String
xml
);
...
...
src/main/java/com/jz/dmp/modules/controller/DataIntegration/OfflineSynchController.java
View file @
aa39ad69
...
...
@@ -297,4 +297,25 @@ public class OfflineSynchController {
return
list
;
}
/**
* 数据预览
*
* @return
* @author Bellamy
* @since 2021-02-19
*/
@ApiOperation
(
value
=
"数据预览"
,
notes
=
"数据预览"
)
@PostMapping
(
value
=
"/dataPreview"
)
public
JsonResult
getDataPreview
(
@RequestBody
@Validated
OfflineDataPreview
req
)
throws
Exception
{
JsonResult
jsonResult
=
new
JsonResult
();
try
{
jsonResult
=
offlineSynchService
.
getDataPreview
(
req
);
}
catch
(
Exception
e
)
{
jsonResult
.
setMessage
(
e
.
getMessage
());
jsonResult
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
e
.
printStackTrace
();
}
return
jsonResult
;
}
}
src/main/java/com/jz/dmp/modules/controller/DataIntegration/bean/OfflineDataPreview.java
0 → 100644
View file @
aa39ad69
package
com
.
jz
.
dmp
.
modules
.
controller
.
DataIntegration
.
bean
;
import
io.swagger.annotations.ApiModel
;
import
io.swagger.annotations.ApiModelProperty
;
import
java.io.Serializable
;
import
java.util.List
;
/**
* @ClassName: OfflineDataPreview
* @Description: 数据预览
* @Author Bellamy
* @Date 2021/2/19
* @Version 1.0
*/
@ApiModel
(
value
=
"数据预览-请求对象"
,
description
=
"数据预览-请求对象"
)
public
class
OfflineDataPreview
implements
Serializable
{
private
static
final
long
serialVersionUID
=
-
4133188450538217747L
;
@ApiModelProperty
(
value
=
"目标表名称"
)
private
List
<
SynchTableColumnsReq
>
params
;
public
List
<
SynchTableColumnsReq
>
getParams
()
{
return
params
;
}
public
void
setParams
(
List
<
SynchTableColumnsReq
>
params
)
{
this
.
params
=
params
;
}
}
src/main/java/com/jz/dmp/modules/service/OfflineSynchService.java
View file @
aa39ad69
...
...
@@ -122,4 +122,12 @@ public interface OfflineSynchService {
*/
JsonResult
addNewSynchTask
(
NewSynchTaskReq
newSynchTaskReq
)
throws
Exception
;
/**
* 数据预览
*
* @return
* @author Bellamy
* @since 2021-02-19
*/
JsonResult
getDataPreview
(
OfflineDataPreview
req
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
aa39ad69
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.github.pagehelper.PageHelper
;
import
com.github.pagehelper.PageInfo
;
import
com.jz.agent.service.DmpDsAgentService
;
...
...
@@ -19,7 +20,6 @@ import com.jz.common.utils.web.XmlUtils;
import
com.jz.dmp.agent.DmpAgentResult
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.*
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution
;
import
com.jz.dmp.modules.controller.bean.DmpNavigationTreeDto
;
import
com.jz.dmp.modules.dao.*
;
import
com.jz.dmp.modules.model.*
;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
...
...
@@ -36,7 +36,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Propagation
;
import
org.springframework.transaction.annotation.Transactional
;
import
springfox.documentation.spring.web.json.Json
;
import
javax.servlet.http.HttpServletRequest
;
import
java.io.File
;
...
...
@@ -861,4 +860,86 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
return
JsonResult
.
error
();
}
/**
* 数据预览
*
* @param req
* @return
* @author Bellamy
* @since 2021-02-19
*/
@Override
public
JsonResult
getDataPreview
(
OfflineDataPreview
req
)
throws
Exception
{
List
<
Map
<
String
,
List
<
Object
>>>
data1
=
new
ArrayList
<>();
List
<
Object
>
listHeard
=
new
ArrayList
<>();
Map
returnData
=
new
HashMap
();
if
(
req
.
getParams
()
==
null
||
req
.
getParams
().
size
()
==
0
)
{
throw
new
RuntimeException
(
"请求参数不能为空!"
);
}
for
(
SynchTableColumnsReq
str
:
req
.
getParams
())
{
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo
dsInfos
=
offlineSynchDao
.
querySourceDbInfoBySourceId
(
str
.
getSourceDbId
());
DmpAgentDatasourceInfo
dsInfo
=
new
DmpAgentDatasourceInfo
();
BeanUtils
.
copyProperties
(
dsInfos
,
dsInfo
);
if
(
dsInfo
==
null
)
{
throw
new
RuntimeException
(
"数据源配置信息不存在!"
);
}
//解码源数据库密码
if
(
StringUtils
.
isNotBlank
(
dsInfo
.
getPassword
()))
{
dsInfo
.
setPassword
(
new
BaseService
().
decode
(
dsInfo
.
getPassword
(),
publicKey
));
}
//创建jdbc,获取数据源数据预览
DmpAgentResult
rst
=
dmpDsAgentServiceImp
.
previewData
(
dsInfo
,
str
.
getTargetTableName
());
if
(!
rst
.
getCode
().
val
().
equals
(
"200"
))
{
return
new
JsonResult
(
rst
.
getCode
(),
rst
.
getMessage
());
}
else
{
//成功,包含表字段和行数据
Map
<
String
,
List
<
Object
>>
returnList
=
(
Map
<
String
,
List
<
Object
>>)
JSONObject
.
parse
(
rst
.
getMessage
());
if
(
returnList
!=
null
&&
returnList
.
size
()
>
0
)
{
List
<
Object
>
header
=
returnList
.
get
(
"header"
);
//表头 字段
listHeard
.
addAll
(
header
);
List
<
List
<
Object
>>
columnData
=
Collections
.
singletonList
(
returnList
.
get
(
"result"
));
//行数据
for
(
int
i
=
0
;
i
<
columnData
.
size
();
i
++)
{
List
<
Object
>
rowData
=
columnData
.
get
(
i
);
Map
<
String
,
List
<
Object
>>
map1
=
new
HashMap
<>();
map1
.
put
(
"value"
,
rowData
);
data1
.
add
(
map1
);
}
}
}
}
if
(
data1
.
size
()
>
0
&&
data1
!=
null
)
{
for
(
int
i
=
0
;
i
<
data1
.
size
();
i
++)
{
Map
mapa
=
data1
.
get
(
i
);
List
<
List
<
Object
>>
lista
=
(
List
<
List
<
Object
>>)
mapa
.
get
(
"value"
);
List
<
List
<
Object
>>
reData
=
new
ArrayList
<>();
int
c
=
++
i
;
for
(
int
a
=
0
;
a
<
lista
.
size
();
a
++)
{
List
<
Object
>
obj1
=
lista
.
get
(
a
);
if
(
c
<
data1
.
size
())
{
Map
mapb
=
data1
.
get
(
c
);
List
<
List
<
Object
>>
listb
=
(
List
<
List
<
Object
>>)
mapb
.
get
(
"value"
);
for
(
int
b
=
0
;
b
<
listb
.
size
();
b
++)
{
List
<
Object
>
obj2
=
listb
.
get
(
b
);
if
(
a
==
b
)
{
obj1
.
addAll
(
obj2
);
break
;
}
}
}
reData
.
add
(
obj1
);
}
mapa
.
put
(
"value"
,
reData
);
}
}
returnData
.
put
(
"header"
,
listHeard
);
returnData
.
put
(
"rowDatas"
,
data1
.
get
(
0
));
return
JsonResult
.
ok
(
returnData
);
}
}
src/main/resources/templates/lxTaskJson.json
View file @
aa39ad69
...
...
@@ -81,7 +81,7 @@
{
"params"
:
{
"version"
:
"1.0"
,
//版本
//
"version"
:
"1.0"
,
//版本
"treeId"
:
669
,
"projectId"
:
"31"
,
"taskId"
:
""
,
//任务id
...
...
@@ -91,10 +91,10 @@
//
"extract"
:
"incremental"
,
//增量/全量
//
"extractExpression"
:
"where 1=1"
,
//增量表达式
,数据过滤
//
"targetInsertMergeOverwrite"
:
"insert"
,
//插入合并重写
"ftColumn"
:
"分桶字段"
,
"ftCount"
:
"分桶个数"
,
"separateMax"
:
"分桶字段最大值"
,
"separateMin"
:
"分桶字段最小值"
,
//
"ftColumn"
:
"分桶字段"
,
//
"ftCount"
:
"分桶个数"
,
//
"separateMax"
:
"分桶字段最大值"
,
//
"separateMin"
:
"分桶字段最小值"
,
"errorLimitRecord"
:
"错误记录数超过"
,
"maxConcurrency"
:
"最大并发数"
,
"executorMemory"
:
"1"
,
//分配任务内存
...
...
@@ -212,7 +212,13 @@
]
}
]
},
"mappingRelation"
:[
{
"source"
:
""
,
"target"
:
""
}
]
},
"taskRules"
:
[
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment