Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jz-dmp-service
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
姚本章
jz-dmp-service
Commits
b07a4416
Commit
b07a4416
authored
Feb 02, 2021
by
sml
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'dmp_dev' of
http://gitlab.ioubuy.cn/yaobenzhang/jz-dmp-service.git
into dmp_dev
parents
2fe650ef
778d7deb
Changes
12
Hide whitespace changes
Inline
Side-by-side
Showing
12 changed files
with
218 additions
and
181 deletions
+218
-181
DelFlagEnum.java
src/main/java/com/jz/common/enums/DelFlagEnum.java
+12
-12
AzkabanApiUtils2.java
src/main/java/com/jz/common/utils/AzkabanApiUtils2.java
+13
-18
FlowParseTool.java
src/main/java/com/jz/common/utils/FlowParseTool.java
+4
-12
OfflineSynchController.java
...es/controller/DataIntegration/OfflineSynchController.java
+8
-2
DmpDevelopTaskDao.java
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
+27
-17
DmpDevelopTaskHistory.java
.../java/com/jz/dmp/modules/model/DmpDevelopTaskHistory.java
+10
-0
DmpWorkFlowSubmitDetails.java
...va/com/jz/dmp/modules/model/DmpWorkFlowSubmitDetails.java
+2
-11
DmpDevelopTaskServiceImpl.java
...z/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
+3
-2
FlowServiceImpl.java
...java/com/jz/dmp/modules/service/impl/FlowServiceImpl.java
+1
-7
OfflineSynchServiceImpl.java
.../jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
+125
-96
DmpDevelopTaskMapper.xml
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
+8
-0
lxTaskJson.json
src/main/resources/templates/lxTaskJson.json
+5
-4
No files found.
src/main/java/com/jz/common/enums/DelFlagEnum.java
View file @
b07a4416
...
...
@@ -28,6 +28,18 @@ public enum DelFlagEnum {
this
.
value
=
value
;
}
public
static
DelFlagEnum
get
(
String
code
)
{
if
(
code
==
null
)
{
return
null
;
}
for
(
DelFlagEnum
status
:
values
())
{
if
(
status
.
getCode
().
equalsIgnoreCase
(
code
))
{
return
status
;
}
}
return
null
;
}
public
String
getCode
()
{
return
code
;
}
...
...
@@ -43,16 +55,4 @@ public enum DelFlagEnum {
public
void
setValue
(
String
value
)
{
this
.
value
=
value
;
}
public
static
DelFlagEnum
get
(
String
code
)
{
if
(
code
==
null
)
{
return
null
;
}
for
(
DelFlagEnum
status
:
values
())
{
if
(
status
.
getCode
().
equalsIgnoreCase
(
code
))
{
return
status
;
}
}
return
null
;
}
}
src/main/java/com/jz/common/utils/AzkabanApiUtils2.java
View file @
b07a4416
...
...
@@ -9,9 +9,9 @@ import java.util.Map;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.core.io.FileSystemResource
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.data.redis.serializer.StringRedisSerializer
;
import
org.springframework.http.HttpEntity
;
import
org.springframework.http.HttpHeaders
;
import
org.springframework.http.client.SimpleClientHttpRequestFactory
;
...
...
@@ -22,7 +22,6 @@ import org.springframework.web.client.RestTemplate;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.support.spring.FastJsonRedisSerializer
;
import
com.google.gson.Gson
;
import
com.jz.common.utils.web.HttpClientUtils
;
import
com.jz.common.utils.web.SessionUtils
;
...
...
@@ -44,21 +43,19 @@ public class AzkabanApiUtils2 {
private
String
userName
;
private
String
password
;
private
RedisTemplate
redisTemplate
;
private
static
final
StringRedisSerializer
stringRedisSerializer
=
new
StringRedisSerializer
();
private
static
final
FastJsonRedisSerializer
fastJsonRedisSerializer
=
new
FastJsonRedisSerializer
(
SSOUserInfo
.
class
);
@Autowired
RedisTemplate
<
String
,
SSOUserInfo
>
redisTemplate
;
public
AzkabanApiUtils2
(
String
azkabanServerUrl
,
String
userName
,
String
password
,
RedisTemplate
<
String
,
SSOUserInfo
>
redisTemplate
)
{
this
(
azkabanServerUrl
,
redisTemplate
);
public
AzkabanApiUtils2
(
String
azkabanServerUrl
,
String
userName
,
String
password
)
{
this
(
azkabanServerUrl
);
this
.
userName
=
userName
;
this
.
password
=
password
;
}
public
AzkabanApiUtils2
(
String
azkabanServerUrl
,
RedisTemplate
<
String
,
SSOUserInfo
>
redisTemplate
)
{
public
AzkabanApiUtils2
(
String
azkabanServerUrl
)
{
this
.
azkabanServerUrl
=
azkabanServerUrl
;
this
.
userName
=
"admin"
;
this
.
password
=
"admin"
;
this
.
redisTemplate
=
redisTemplate
;
}
/**
...
...
@@ -104,18 +101,16 @@ public class AzkabanApiUtils2 {
LOGGER.error(azkabanServerUrl+"-----"+linkedMultiValueMap+" sessionId 为空");
throw new RuntimeException("登陆失败");
}*/
String
sessionId
=
SessionUtils
.
getSession
().
getId
();
String
sessionId
=
SessionUtils
.
getSession
().
getId
();
//"dcfc608c-c58a-45b7-adc7-9902b652496e";
//String sessionId = "f0d06f4a-874c-4dfc-8959-101b6add6bf5";
//通过redis方式登录Azkaban
String
redisKey
=
"spring:sessions:sessions:"
+
sessionId
;
SSOUserInfo
ssoUserInfo
=
(
SSOUserInfo
)
redisTemplate
.
opsForValue
().
get
(
redisKey
);
if
(
ssoUserInfo
==
null
)
{
redisTemplate
.
setKeySerializer
(
stringRedisSerializer
);
redisTemplate
.
setValueSerializer
(
fastJsonRedisSerializer
);
redisTemplate
.
opsForValue
().
set
(
redisKey
,
getSSOuserInfo
());
SSOUserInfo
ssoUserInfo
=
redisTemplate
.
opsForValue
().
get
(
redisKey
);
if
(
ssoUserInfo
==
null
)
{
redisTemplate
.
opsForValue
().
set
(
redisKey
,
getSSOuserInfo
());
}
System
.
err
.
println
(
"----sessionId="
+
sessionId
);
return
sessionId
;
//SessionUtils.getSession().getId();
}
...
...
@@ -657,7 +652,7 @@ public class AzkabanApiUtils2 {
}
public
static
void
main
(
String
[]
args
)
throws
Exception
{
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
"http://119.23.32.151:8083"
,
null
);
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
"http://119.23.32.151:8083"
);
boolean
dw_test
=
azkabanApiUtils
.
checkFlowExists
(
"dw_test"
,
"123"
);
System
.
err
.
println
(
dw_test
);
}
...
...
src/main/java/com/jz/common/utils/FlowParseTool.java
View file @
b07a4416
...
...
@@ -10,7 +10,6 @@ import java.util.Map;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.time.DateFormatUtils
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
...
...
@@ -24,7 +23,6 @@ import com.jz.dmp.modules.model.DmpNavigationTree;
import
com.jz.dmp.modules.model.DmpProject
;
import
com.jz.dmp.modules.model.DmpProjectSystemInfo
;
import
com.jz.dmp.modules.model.DmpWorkFlowSubmitDetails
;
import
com.jz.dmp.modules.model.SSOUserInfo
;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
import
com.jz.dmp.modules.service.DmpNavigationTreeService
;
import
com.jz.dmp.modules.service.DmpWorkFlowSubmitDetailsService
;
...
...
@@ -61,8 +59,6 @@ public class FlowParseTool {
private
DmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService
;
private
RedisTemplate
<
String
,
SSOUserInfo
>
redisTemplate
;
/**
* 流程属性
*/
...
...
@@ -110,14 +106,12 @@ public class FlowParseTool {
DmpProjectSystemInfo
publishedToProjectSystemInfo
,
DmpDevelopTaskService
dmpDevelopTaskService
,
DmpNavigationTreeService
dmpNavigationTreeService
,
DmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService
,
RedisTemplate
<
String
,
SSOUserInfo
>
redisTemplate
)
{
DmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService
)
{
this
(
flowPro
,
dmpWorkFlowSubmitDetailsService
);
this
.
publishedToProject
=
publishedToProject
;
//this.publishedToProjectSystemInfo = publishedToProjectSystemInfo;
this
.
dmpDevelopTaskService
=
dmpDevelopTaskService
;
this
.
dmpNavigationTreeService
=
dmpNavigationTreeService
;
this
.
redisTemplate
=
redisTemplate
;
}
/**
...
...
@@ -132,14 +126,12 @@ public class FlowParseTool {
DmpProjectConfigInfoDto
dmpProjectConfigInfoDto
,
DmpDevelopTaskService
dmpDevelopTaskService
,
DmpNavigationTreeService
dmpNavigationTreeService
,
DmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService
,
RedisTemplate
<
String
,
SSOUserInfo
>
redisTemplate
)
{
DmpWorkFlowSubmitDetailsService
dmpWorkFlowSubmitDetailsService
)
{
this
(
flowPro
,
dmpWorkFlowSubmitDetailsService
);
this
.
publishedToProject
=
publishedToProject
;
this
.
dmpProjectConfigInfoDto
=
dmpProjectConfigInfoDto
;
this
.
dmpDevelopTaskService
=
dmpDevelopTaskService
;
this
.
dmpNavigationTreeService
=
dmpNavigationTreeService
;
this
.
redisTemplate
=
redisTemplate
;
}
private
void
parse
()
{
...
...
@@ -524,7 +516,7 @@ public class FlowParseTool {
//上次zip包到azkaban
String
localTaskZipAbsolutePath
=
localTaskZipPath
+
"/"
+
localZipTargetFileName
;
String
azkabanApiUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanMonitorUrl
();
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
,
redisTemplate
);
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
);
return
azkabanApiUtils
.
loginCreateProjectuploadZipAndSchedule
(
"jz_workflow_new_"
+
publishedToProjectId
,
publishedToProject
.
getProjectDesc
(),
localTaskZipAbsolutePath
,
flowPro
);
}
...
...
@@ -661,7 +653,7 @@ public class FlowParseTool {
String
subProcessFlowName
=
flowNode
.
getScript
();
//检查子流程是否存在 todo
String
azkabanApiUrl
=
dmpProjectConfigInfoDto
.
getDmpPublicConfigInfoDto
().
getAzkabanMonitorUrl
();
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
,
redisTemplate
);
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
);
boolean
flowExists
=
azkabanApiUtils
.
checkFlowExists
(
"jz_workflow_"
+
flowPro
.
getPublishedToProjectId
(),
subProcessFlowName
);
if
(!
flowExists
)
{
throw
new
RuntimeException
(
"节点:"
+
flowNode
.
getNodeName
()
+
"设置的子流程:"
+
subProcessFlowName
+
"不存在,请先发布"
+
subProcessFlowName
);
...
...
src/main/java/com/jz/dmp/modules/controller/DataIntegration/OfflineSynchController.java
View file @
b07a4416
...
...
@@ -5,7 +5,6 @@ import com.jz.common.constant.ResultCode;
import
com.jz.common.page.BasePageBean
;
import
com.jz.common.page.PageInfoResponse
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.*
;
import
com.jz.dmp.modules.controller.dataService.bean.SoureTableColumnsReq
;
import
com.jz.dmp.modules.service.DmpSyncingDatasourceTypeService
;
import
com.jz.dmp.modules.service.OfflineSynchService
;
import
io.swagger.annotations.Api
;
...
...
@@ -244,7 +243,14 @@ public class OfflineSynchController {
@ApiOperation
(
value
=
"保存离线任务数据"
,
notes
=
"保存离线任务数据"
)
@PostMapping
(
value
=
"/addSyncTask"
)
public
JsonResult
addSyncTask
(
@RequestBody
@Validated
SyncDmpTaskAddReq
syncDmpTaskAddReq
)
throws
Exception
{
JsonResult
list
=
offlineSynchService
.
addSyncTask
(
syncDmpTaskAddReq
);
JsonResult
list
=
new
JsonResult
();
try
{
list
=
offlineSynchService
.
addSyncTask
(
syncDmpTaskAddReq
);
}
catch
(
Exception
e
)
{
list
.
setCode
(
ResultCode
.
INTERNAL_SERVER_ERROR
);
list
.
setMessage
(
e
.
getMessage
());
e
.
printStackTrace
();
}
return
list
;
}
...
...
src/main/java/com/jz/dmp/modules/dao/DmpDevelopTaskDao.java
View file @
b07a4416
...
...
@@ -44,22 +44,32 @@ public interface DmpDevelopTaskDao {
List
<
DataDevTaskListDto
>
queryTaskTreeInfo
(
Map
params
)
throws
Exception
;
/**条件查询任务开发
* @param param
* @return
* @throws Exception
*/
public
List
<
DmpDevelopTask
>
findList
(
Map
<
String
,
Object
>
param
)
throws
Exception
;
/**
* @Title: get
* @Description: TODO(主键获取对象)
* @param @param id
* @param @return
* @param @throws Exception 参数
* @return DmpDevelopTask 返回类型
* @throws
*/
public
DmpDevelopTask
get
(
Long
id
)
throws
Exception
;
/**
* 条件查询任务开发
*
* @param param
* @return
* @throws Exception
*/
public
List
<
DmpDevelopTask
>
findList
(
Map
<
String
,
Object
>
param
)
throws
Exception
;
/**
* @param @param id
* @param @return
* @param @throws Exception 参数
* @return DmpDevelopTask 返回类型
* @throws
* @Title: get
* @Description: TODO(主键获取对象)
*/
public
DmpDevelopTask
get
(
Long
id
)
throws
Exception
;
/**
* 根据主键查询
*
* @return
* @author Bellamy
* @since 2021-02-01
*/
DmpDevelopTask
selectTaskById
(
@Param
(
"taskId"
)
String
taskId
)
throws
Exception
;
}
src/main/java/com/jz/dmp/modules/model/DmpDevelopTaskHistory.java
View file @
b07a4416
...
...
@@ -176,6 +176,8 @@ public class DmpDevelopTaskHistory implements Serializable{
@JsonFormat
(
pattern
=
"yyyy-MM-dd HH:mm:ss"
)
private
Date
updateTime
;
private
byte
[]
data
;
public
Integer
getId
()
{
return
id
;
}
...
...
@@ -375,4 +377,12 @@ public class DmpDevelopTaskHistory implements Serializable{
public
void
setUpdateTime
(
Date
updateTime
)
{
this
.
updateTime
=
updateTime
;
}
public
byte
[]
getData
()
{
return
data
;
}
public
void
setData
(
byte
[]
data
)
{
this
.
data
=
data
;
}
}
src/main/java/com/jz/dmp/modules/model/DmpWorkFlowSubmitDetails.java
View file @
b07a4416
...
...
@@ -10,8 +10,7 @@ import java.util.Date;
public
class
DmpWorkFlowSubmitDetails
implements
Serializable
{
private
static
final
long
serialVersionUID
=
1L
;
private
Long
id
;
private
Long
scheduleProjectId
;
private
String
scheduleFlowName
;
private
String
nodeName
;
...
...
@@ -24,16 +23,8 @@ public class DmpWorkFlowSubmitDetails implements Serializable {
private
Date
createdTime
;
private
String
createTimeStr
;
public
Long
getId
()
{
return
id
;
}
public
void
setId
(
Long
id
)
{
this
.
id
=
id
;
}
public
Long
getScheduleProjectId
()
{
public
Long
getScheduleProjectId
()
{
return
scheduleProjectId
;
}
...
...
src/main/java/com/jz/dmp/modules/service/impl/DmpDevelopTaskServiceImpl.java
View file @
b07a4416
...
...
@@ -129,7 +129,8 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
System
.
out
.
println
(
xmlContent
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"封装离线同步xml内容出错:"
+
e
.
getMessage
(),
e
);
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
);
throw
new
RuntimeException
(
e
.
getMessage
());
//return JsonResult.error(ResultCode.INTERNAL_SERVER_ERROR,e.getMessage());
}
return
submitSyncXml
(
task
.
getProjectId
(),
xmlContent
);
}
...
...
@@ -250,7 +251,7 @@ public class DmpDevelopTaskServiceImpl extends BaseService implements DmpDevelop
DmpSyncingDatasource
tDS
=
this
.
getDmpSyncingDatasource
(
_projectId_
,
_targetDbConnection
);
//目标
if
(
null
==
sDS
||
null
==
tDS
||
null
==
sDS
.
getDatasourceType
()
||
null
==
tDS
.
getDatasourceType
())
throw
new
Exception
(
"同步数据来源或数据去向信息有误"
);
throw
new
Runtime
Exception
(
"同步数据来源或数据去向信息有误"
);
//根据数据源类型ID 获取数据源类型
DmpSyncingDatasourceType
sDST
=
dmpSyncingDatasourceTypeDao
.
queryById
(
sDS
.
getDatasourceType
());
//源
DmpSyncingDatasourceType
tDST
=
dmpSyncingDatasourceTypeDao
.
queryById
(
tDS
.
getDatasourceType
());
//目标
...
...
src/main/java/com/jz/dmp/modules/service/impl/FlowServiceImpl.java
View file @
b07a4416
...
...
@@ -4,7 +4,6 @@ import java.util.Date;
import
java.util.List
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Transactional
;
...
...
@@ -22,7 +21,6 @@ import com.jz.dmp.modules.dao.projconfig.DmpProjectConfigInfoMapper;
import
com.jz.dmp.modules.model.DmpProject
;
import
com.jz.dmp.modules.model.DmpProjectSystemInfo
;
import
com.jz.dmp.modules.model.DmpWorkFlowSubmitDetails
;
import
com.jz.dmp.modules.model.SSOUserInfo
;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
import
com.jz.dmp.modules.service.DmpNavigationTreeService
;
import
com.jz.dmp.modules.service.DmpProjectService
;
...
...
@@ -59,9 +57,6 @@ public class FlowServiceImpl implements FlowService {
@Autowired
private
DmpProjectConfigInfoService
dmpProjectConfigInfoService
;
@Autowired
private
RedisTemplate
redisTemplate
;
/**
*工作流发布
...
...
@@ -94,8 +89,7 @@ public class FlowServiceImpl implements FlowService {
dmpProjectConfigInfoDto
,
dmpDevelopTaskService
,
dmpNavigationTreeService
,
dmpWorkFlowSubmitDetailsService
,
redisTemplate
dmpWorkFlowSubmitDetailsService
);
//保存发布信息
...
...
src/main/java/com/jz/dmp/modules/service/impl/OfflineSynchServiceImpl.java
View file @
b07a4416
package
com
.
jz
.
dmp
.
modules
.
service
.
impl
;
import
java.io.File
;
import
java.io.UnsupportedEncodingException
;
import
java.text.SimpleDateFormat
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
javax.servlet.http.HttpServletRequest
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Propagation
;
import
org.springframework.transaction.annotation.Transactional
;
import
com.alibaba.fastjson.JSON
;
import
com.github.pagehelper.PageHelper
;
import
com.github.pagehelper.PageInfo
;
...
...
@@ -38,39 +17,30 @@ import com.jz.common.utils.ZipUtils;
import
com.jz.common.utils.web.SessionUtils
;
import
com.jz.common.utils.web.XmlUtils
;
import
com.jz.dmp.agent.DmpAgentResult
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.CheckJyRlueStatusDto
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.CheckJyRlueStatusReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.CheckTaskStatusPageDto
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.CheckTaskStatusPageReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.DvRuleTDto
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.NewSynchTaskReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.SourceDbNameListDto
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.SoureAndTargetColumnsReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.SyncDmpTaskAddReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.SynchTableColumnsReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.TaskListPageDto
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.TaskListPageReq
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.*
;
import
com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution
;
import
com.jz.dmp.modules.dao.DmpDevelopTaskDao
;
import
com.jz.dmp.modules.dao.DmpNavigationTreeDao
;
import
com.jz.dmp.modules.dao.DmpProjectDao
;
import
com.jz.dmp.modules.dao.DmpTableColumnDao
;
import
com.jz.dmp.modules.dao.DmpTableDao
;
import
com.jz.dmp.modules.dao.DmpTableFieldSchemaDao
;
import
com.jz.dmp.modules.dao.DvRuleTDao
;
import
com.jz.dmp.modules.dao.OfflineSynchDao
;
import
com.jz.dmp.modules.model.DmpAgentDatasourceInfo
;
import
com.jz.dmp.modules.model.DmpDevelopTask
;
import
com.jz.dmp.modules.model.DmpNavigationTree
;
import
com.jz.dmp.modules.model.DmpProjectSystemInfo
;
import
com.jz.dmp.modules.model.DmpTable
;
import
com.jz.dmp.modules.model.DmpTableColumn
;
import
com.jz.dmp.modules.model.DmpTableFieldSchema
;
import
com.jz.dmp.modules.model.DvRuleT
;
import
com.jz.dmp.modules.model.DvTaskRuleT
;
import
com.jz.dmp.modules.dao.*
;
import
com.jz.dmp.modules.model.*
;
import
com.jz.dmp.modules.service.DmpDevelopTaskService
;
import
com.jz.dmp.modules.service.DvTaskRuleTService
;
import
com.jz.dmp.modules.service.OfflineSynchService
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.BeanUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Service
;
import
org.springframework.transaction.annotation.Propagation
;
import
org.springframework.transaction.annotation.Transactional
;
import
javax.servlet.http.HttpServletRequest
;
import
java.io.File
;
import
java.io.UnsupportedEncodingException
;
import
java.math.BigDecimal
;
import
java.text.SimpleDateFormat
;
import
java.util.*
;
/**
* @Description:离线同步服务层
...
...
@@ -120,9 +90,10 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired
private
DvTaskRuleTService
dvTaskRuleTService
;
@Autowired
private
RedisTemplate
redisTemplate
;
private
DmpDevelopTaskHistoryMapper
dmpDevelopTaskHistoryMapper
;
/**
* 离线同步任务列表分页查询
...
...
@@ -266,7 +237,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//上传到azkaban todo
//上次zip包到azkaban
String
localTaskZipAbsolutePath
=
localTaskZipPath
+
"/"
+
localZipTargetFileName
;
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
,
redisTemplate
);
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
);
return
azkabanApiUtils
.
loginCreateProjectuploadZipAndExecute
(
"jz_localflow_"
+
projectId
,
"local_sync_project"
,
localTaskZipAbsolutePath
,
treeName
);
}
...
...
@@ -312,7 +283,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
//调用azkaban服务
String
azkabanApiUrl
=
publishToProjectSystemInfo
.
getAzkabanMonitorUrl
();
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
,
redisTemplate
);
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanApiUrl
);
list
=
azkabanApiUtils
.
getSyncingFlowExecution
(
projectId
,
treeName
,
checkTaskStatusPageReq
.
getPageNum
(),
checkTaskStatusPageReq
.
getPageSize
());
SimpleDateFormat
dtf
=
new
SimpleDateFormat
(
"yyyy-MM-dd HH:mm:ss"
);
...
...
@@ -502,15 +473,16 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Override
@Transactional
(
rollbackFor
=
Exception
.
class
,
propagation
=
Propagation
.
REQUIRES_NEW
)
public
JsonResult
addSyncTask
(
SyncDmpTaskAddReq
syncDmpTaskAddReq
)
throws
Exception
{
JsonResult
jsonResult
=
new
JsonResult
();
List
<
Map
<
String
,
Object
>>
result
=
new
ArrayList
<>();
Map
<
String
,
Object
>
reqParam
=
syncDmpTaskAddReq
.
getParams
();
if
(
reqParam
.
size
()
>
0
&&
reqParam
!=
null
)
{
JsonResult
jsonResult
=
addSyncing
(
reqParam
);
jsonResult
=
addSyncing
(
reqParam
);
DmpDevelopTask
data
=
(
DmpDevelopTask
)
jsonResult
.
getData
();
//保存规则信息
saveRuleInfo
(
result
,
reqParam
,
jsonResult
,
String
.
valueOf
(
data
.
getId
()));
//
saveRuleInfo(result, reqParam, jsonResult, String.valueOf(data.getId()));
}
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
result
)
;
return
jsonResult
;
}
/*
...
...
@@ -582,11 +554,11 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao
.
update
(
task
);
//更新任务
logger
.
info
(
"################################## 更新任务数据结束 ############################################"
);
DmpNavigationTree
dmpNavigationTree
=
new
DmpNavigationTree
();
/*
DmpNavigationTree dmpNavigationTree = new DmpNavigationTree();
dmpNavigationTree.setName(taskName);
dmpNavigationTree.setId(treeId);
dmpNavigationTreeDao.update(dmpNavigationTree); //更新节点 树
logger
.
info
(
"################################## 更新节点 树 结束 ############################################"
);
logger.info("################################## 更新节点 树 结束 ############################################");
*/
//更新规则信息
List
<
DvTaskRuleT
>
list
=
new
ArrayList
<>();
//查询TaskRuleID 集合
...
...
@@ -611,30 +583,28 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
* @return
*/
public
JsonResult
addSyncing
(
Map
<
String
,
Object
>
body
)
throws
Exception
{
if
(
StringUtils
.
isEmpty
(
body
.
get
(
"treeId"
).
toString
()))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"treeId不能为空!"
);
}
if
(
StringUtils
.
isEmpty
(
body
.
get
(
"projectId"
).
toString
()))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"projectId不能为空!"
);
}
if
(
StringUtils
.
isBlank
(
body
.
get
(
"taskName"
).
toString
()))
{
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
}
Integer
projectId
=
Integer
.
valueOf
(
body
.
get
(
"projectId"
).
toString
());
Integer
parentId
=
Integer
.
valueOf
(
body
.
get
(
"parentId"
).
toString
());
//父节点ID
String
taskName
=
(
String
)
body
.
get
(
"taskName"
);
//任务名称 业务节点名称 一对一
String
taskId
=
body
.
get
(
"taskId"
).
toString
();
//任务ID
Integer
treeId
=
Integer
.
valueOf
(
body
.
get
(
"treeId"
).
toString
());
//树节点ID
//Integer taskId = null; //任务ID
String
taskName
=
body
.
get
(
"taskName"
).
toString
();
//任务名称 业务节点名称 一对一
//Integer parentId = Integer.valueOf(body.get("parentId").toString()); //父节点ID
if
(
StringUtils
.
isBlank
(
taskName
))
{
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"任务名称不能为空"
);
}
Map
<
String
,
Object
>
scriptMap
=
(
Map
<
String
,
Object
>)
body
.
get
(
"scripts"
);
//任务json数据
Object
content
=
scriptMap
.
get
(
"content"
);
String
xmlTdb
=
XmlUtils
.
getPropertyValue
(
content
,
"target_db_connection"
);
if
(
null
!=
content
&&
xmlTdb
.
length
()
==
0
)
{
// 包含content但未取出值条件才成立
return
new
JsonResult
(
ResultCode
.
PARAMS_ERROR
,
"脚本内容中缺失目标数据源(target_db_connection)"
);
}
/*DmpNavigationTree tree = new DmpNavigationTree();
tree.setName(taskName); //树节点名称
tree.setProjectId(projectId);
tree.setCategory("2"); //树类别
tree.setIsLevel("1"); //是否叶子节点 1 是
int cnt = dmpNavigationTreeDao.countTreeByName(tree);
if (cnt > 0) {
throw new RuntimeException("当前项目已存在同名的任务");
}*/
return
JsonResult
.
error
(
ResultCode
.
PARAMS_ERROR
,
"脚本内容中缺失目标数据源(target_db_connection)"
);
}
//保存目标库类型
Map
<
String
,
Object
>
writer
=
(
Map
<
String
,
Object
>)
scriptMap
.
get
(
"writer"
);
// 目标数据
...
...
@@ -653,11 +623,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
DmpDevelopTask
task
=
new
DmpDevelopTask
();
task
.
setProjectId
(
projectId
);
//task.setParentId(parentId);
task
.
setTaskType
(
"2"
);
//任务类型
task
.
setDatasourceId
(
dataSourceId
);
//数据源ID
task
.
setType
(
"3"
);
//task.setTaskName(taskName);
task
.
setTaskDesc
(
"Syncing Task"
);
//任务描述
task
.
setIsSubmit
(
"0"
);
//是否已提交
task
.
setTreeId
(
treeId
);
...
...
@@ -670,17 +638,71 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
e
.
printStackTrace
();
}
task
.
setData
(
data
);
//json 数据
task
.
setScript
(
body
.
toString
());
task
.
setTargetDbName
(
targetDb
);
task
.
setTargetTableName
(
targetTable
);
task
.
setSourceTableName
(
sourceTableName
);
task
.
setSourceDbName
(
sourceDbName
);
task
.
setCreateUserId
(
SessionUtils
.
getCurrentUserId
());
task
.
setCreateTime
(
new
Date
());
dmpDevelopTaskDao
.
insert
(
task
);
//保存任务数据
logger
.
info
(
"################################## 新增任务数据结束 ############################################"
);
if
(
StringUtils
.
isEmpty
(
taskId
))
{
task
.
setVersion
(
"1.0"
);
task
.
setCreateUserId
(
SessionUtils
.
getCurrentUserId
());
task
.
setCreateTime
(
new
Date
());
dmpDevelopTaskDao
.
insert
(
task
);
//新增任务数据
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
logger
.
info
(
"################################## 新增离线任务数据结束 ############################################"
);
//更新规则信息
List
<
DvTaskRuleT
>
list
=
new
ArrayList
<>();
List
<
Map
>
taskRules
=
(
List
<
Map
>)
body
.
get
(
"taskRules"
);
//保存dmp数据校验规则信息
settRuleInfo
(
taskId
,
taskRules
,
list
);
}
else
{
DmpDevelopTask
devTask
=
dmpDevelopTaskDao
.
selectTaskById
(
taskId
);
BigDecimal
version
=
new
BigDecimal
(
devTask
.
getVersion
());
version
=
version
.
add
(
new
BigDecimal
(
1.0
));
task
.
setVersion
(
String
.
valueOf
(
version
));
task
.
setUpdateTime
(
new
Date
());
task
.
setUpdateUserId
(
SessionUtils
.
getCurrentUserId
());
task
.
setId
(
Integer
.
valueOf
(
taskId
));
dmpDevelopTaskDao
.
update
(
task
);
this
.
saveTaskHistory
(
task
);
//保存任务历史版本
logger
.
info
(
"################################## 编辑离线任务数据结束 ############################################"
);
}
//保存时提交XML
dmpDevelopTaskService
.
submitSyncing
(
task
);
return
new
JsonResult
(
ResultCode
.
SUCCESS
,
task
);
return
dmpDevelopTaskService
.
submitSyncing
(
task
);
}
/**
* 保存任务历史版本
*
* @param task
* @return
* @Author Bellamy
* @Date 2021/02/01
*/
public
JsonResult
saveTaskHistory
(
DmpDevelopTask
task
)
throws
Exception
{
if
(
task
.
getId
()
==
null
)
{
throw
new
RuntimeException
(
"保存失败!"
);
}
DmpDevelopTaskHistory
taskHistory
=
new
DmpDevelopTaskHistory
();
taskHistory
.
setTaskId
(
task
.
getId
());
taskHistory
.
setVersion
(
task
.
getVersion
());
//taskHistory.setData(task.getData());
taskHistory
.
setScript
(
task
.
getScript
());
taskHistory
.
setTaskDesc
(
task
.
getTaskDesc
());
taskHistory
.
setTreeId
(
task
.
getTreeId
());
taskHistory
.
setTaskType
(
task
.
getTaskType
());
taskHistory
.
setDatasourceId
(
task
.
getDatasourceId
());
taskHistory
.
setCreateTime
(
task
.
getCreateTime
());
if
(
StringUtils
.
isNotEmpty
(
task
.
getCreateUserId
()))
{
taskHistory
.
setCreateUserId
(
Integer
.
valueOf
(
task
.
getCreateUserId
()));
}
taskHistory
.
setUpdateTime
(
task
.
getUpdateTime
());
if
(
StringUtils
.
isNotEmpty
(
task
.
getUpdateUserId
()))
{
taskHistory
.
setUpdateUserId
(
Integer
.
valueOf
(
task
.
getUpdateUserId
()));
}
taskHistory
.
setDataStatus
(
DelFlagEnum
.
NO
.
getValue
());
dmpDevelopTaskHistoryMapper
.
insert
(
taskHistory
);
return
JsonResult
.
ok
();
}
/**
...
...
@@ -717,17 +739,21 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
* @throws Exception
*/
private
void
settRuleInfo
(
String
taskId
,
List
<
Map
>
rules
,
List
<
DvTaskRuleT
>
list
)
throws
Exception
{
for
(
Map
rule
:
rules
)
{
DvTaskRuleT
taskRuleT
=
new
DvTaskRuleT
();
Integer
ruleId
=
Integer
.
valueOf
(
rule
.
get
(
"ruleId"
).
toString
());
String
ruleValue
=
(
String
)
rule
.
get
(
"ruleValue"
);
taskRuleT
.
setTaskId
(
taskId
);
//任务ID
taskRuleT
.
setRuleId
(
ruleId
.
longValue
());
taskRuleT
.
setRuleValue
(
ruleValue
);
list
.
add
(
taskRuleT
);
}
//批量新增任务与规则关系表
dvTaskRuleTService
.
saveRule
(
list
);
if
(
rules
.
size
()
>
0
&&
rules
!=
null
)
{
for
(
Map
rule
:
rules
)
{
if
(
StringUtils
.
isNotEmpty
(
rule
.
get
(
"ruleId"
).
toString
()))
{
DvTaskRuleT
taskRuleT
=
new
DvTaskRuleT
();
String
ruleValue
=
(
String
)
rule
.
get
(
"ruleValue"
);
taskRuleT
.
setTaskId
(
taskId
);
//任务ID
taskRuleT
.
setRuleId
(
Long
.
valueOf
(
rule
.
get
(
"ruleId"
).
toString
()));
taskRuleT
.
setRuleValue
(
ruleValue
);
list
.
add
(
taskRuleT
);
}
}
//批量新增任务与规则关系表
if
(
list
.
size
()
>
0
&&
list
!=
null
)
dvTaskRuleTService
.
saveRule
(
list
);
}
}
/**
...
...
@@ -768,6 +794,9 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
for
(
int
i
=
0
;
i
<
returnList
.
size
();
i
++)
{
Map
map
=
returnList
.
get
(
i
);
map
.
put
(
"id"
,
++
len
);
map
.
put
(
"fieldAlias"
,
map
.
get
(
"name"
));
//字段别名
map
.
put
(
"isPk"
,
0
);
map
.
put
(
"isPt"
,
0
);
returnData
.
add
(
map
);
}
}
...
...
@@ -792,7 +821,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
}
String
azkabanMonitorUrl
=
map
.
get
(
"azkabanMonitorUrl"
).
toString
();
Long
projectId
=
Long
.
valueOf
(
map
.
get
(
"projectId"
).
toString
());
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
,
redisTemplate
);
AzkabanApiUtils2
azkabanApiUtils
=
new
AzkabanApiUtils2
(
azkabanMonitorUrl
);
String
execId
=
azkabanApiUtils
.
stopFlow
(
"jz_localflow_"
+
projectId
,
map
.
get
(
"treeName"
).
toString
());
return
JsonResult
.
ok
();
}
...
...
src/main/resources/mapper/dmp/DmpDevelopTaskMapper.xml
View file @
b07a4416
...
...
@@ -221,4 +221,12 @@
SELECT
<include
refid=
"FIND_ALL_COLUMN"
/>
FROM dmp_develop_task WHERE tree_id = #{id}
</select>
<select
id=
"selectTaskById"
resultType=
"com.jz.dmp.modules.model.DmpDevelopTask"
>
select
<include
refid=
"FIND_ALL_COLUMN"
/>
from dmp_develop_task
where 1=1 and data_status='1'
and id=#{taskId}
</select>
</mapper>
\ No newline at end of file
src/main/resources/templates/lxTaskJson.json
View file @
b07a4416
...
...
@@ -81,7 +81,7 @@
{
"params"
:
{
"version"
:
"1.0"
,
"version"
:
"1.0"
,
//版本
"treeId"
:
669
,
//
"parentId"
:
"509"
,
"mode"
:
"0"
,
...
...
@@ -97,8 +97,8 @@
"ftCount"
:
"分桶个数"
,
"separateMax"
:
"分桶字段最大值"
,
"separateMin"
:
"分桶字段最小值"
,
"primaryKey"
:
"主键"
,
"partition"
:
"分区"
,
//
"primaryKey"
:
"主键"
,
//
"partition"
:
"分区"
,
"postImportStatement"
:
"导入后语句"
,
"preImportStatement"
:
"导入前语句"
,
"errorLimitRecord"
:
"错误记录数超过"
,
...
...
@@ -106,7 +106,8 @@
//
"syncRate"
:
"同步速率"
,
"executorMemory"
:
"1"
,
//分配任务内存
"executorCores"
:
"1"
,
//单executor的cpu数
"totalExecutorCores"
:
"1"
//总executor的cpu数
"totalExecutorCores"
:
"1"
,
//总executor的cpu数
"fieldMapping"
:
""
//字段映射关系
},
"reader"
:
{
"dbConnection"
:
"mysql_dmp_demo_test"
,
//来源名称
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment