Commit 22caccbc authored by mcb's avatar mcb

no message

parent 1420183a
......@@ -143,9 +143,8 @@ public class OfflineSynchController {
* 获取源表和目标表的字段
* @return
*/
@ApiOperation(value = "删除任务", notes = "删除任务")
@GetMapping(value = "/getSoureAndTargetColumns")
@ApiImplicitParam(name = "taskId", value = "任务id")
@ApiOperation(value = "获取源表和目标表的字段", notes = "获取源表和目标表的字段")
@PostMapping(value = "/getSoureAndTargetColumns")
public JsonResult getSoureAndTargetColumns(@RequestBody @Validated SoureAndTargetColumnsReq soureAndTargetColumnsReq) throws Exception {
JsonResult list = offlineSynchService.querySoureAndTargetColumnsByParams(soureAndTargetColumnsReq);
return list;
......
......@@ -23,12 +23,10 @@ public class SoureAndTargetColumnsReq implements Serializable {
* 源数据库ID
* */
@NotNull(message = "源数据库ID不能为空")
@NotEmpty(message = "源数据库ID不能为空")
@ApiModelProperty(value = "源数据库ID")
private Long sourceDbId;
@NotNull(message = "目标数据库ID不能为空")
@NotEmpty(message = "目标数据库ID不能为空")
@ApiModelProperty(value = "目标数据库ID")
private Long targetDbId;
......
......@@ -53,7 +53,7 @@ public interface DmpTableColumnDao{
* @param entities List<DmpTableColumn> 实例对象列表
* @return 影响行数
*/
int insertBatch(@Param("entities") List<DmpTableColumn> entities);
int insertBatch(List<DmpTableColumn> entities);
/**
* 批量新增或按主键更新数据(MyBatis原生foreach方法)
......@@ -79,4 +79,5 @@ public interface DmpTableColumnDao{
*/
int deleteById(Integer id);
int deleteByTableId(Integer tableId);
}
\ No newline at end of file
......@@ -4,6 +4,7 @@ import com.jz.dmp.modules.model.DmpTableFieldSchema;
import org.apache.ibatis.annotations.Param;
import java.util.List;
import java.util.Map;
/**
* 项目表字段类型(DmpTableFieldSchema)表数据库访问层
......@@ -79,4 +80,5 @@ public interface DmpTableFieldSchemaDao {
*/
int deleteById(Integer id);
List<Map<String, Object>> getTableFieldsMapping(Map<String, Object> params) throws Exception;
}
\ No newline at end of file
......@@ -20,6 +20,10 @@ public class DmpAgentDatasourceInfo implements Serializable {
/**
* 数据源类型ID
*/
private String datasourceTypeId;
/**
* 数据源类型
*/
private String datasourceType;
/**
* 数据源名称
......@@ -520,4 +524,12 @@ public class DmpAgentDatasourceInfo implements Serializable {
public void setHdfsSyncingPath(String hdfsSyncingPath) {
this.hdfsSyncingPath = hdfsSyncingPath;
}
public String getDatasourceTypeId() {
return datasourceTypeId;
}
public void setDatasourceTypeId(String datasourceTypeId) {
this.datasourceTypeId = datasourceTypeId;
}
}
......@@ -14,16 +14,12 @@ import com.jz.common.utils.ZipUtils;
import com.jz.dmp.agent.DmpAgentResult;
import com.jz.dmp.modules.controller.DataIntegration.bean.*;
import com.jz.dmp.modules.controller.DataIntegration.bean.flow.FlowExecution;
import com.jz.dmp.modules.dao.DmpDevelopTaskDao;
import com.jz.dmp.modules.dao.DmpProjectDao;
import com.jz.dmp.modules.dao.DmpTableDao;
import com.jz.dmp.modules.dao.OfflineSynchDao;
import com.jz.dmp.modules.model.DmpAgentDatasourceInfo;
import com.jz.dmp.modules.model.DmpProjectSystemInfo;
import com.jz.dmp.modules.model.DmpSyncingDatasource;
import com.jz.dmp.modules.model.DmpTable;
import com.jz.dmp.modules.dao.*;
import com.jz.dmp.modules.model.*;
import com.jz.dmp.modules.service.OfflineSynchService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.format.annotation.DateTimeFormat;
......@@ -45,6 +41,8 @@ import java.util.*;
@Transactional
public class OfflineSynchServiceImpl implements OfflineSynchService {
private static Logger logger = LoggerFactory.getLogger(OfflineSynchServiceImpl.class);
@Value("${spring.public-key}")
private String publicKey;
......@@ -63,6 +61,12 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
@Autowired
private DmpTableDao dmpTableDao;
@Autowired
private DmpTableFieldSchemaDao dmpTableFieldSchemaDao;
@Autowired
private DmpTableColumnDao dmpTableColumnDao;
@Override
public PageInfoResponse<TaskListPageDto> queryTaskListPage(TaskListPageReq taskListPageReq) throws Exception {
PageInfoResponse<TaskListPageDto> pageInfoResponse = new PageInfoResponse<>();
......@@ -219,6 +223,7 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
dmpDevelopTaskDao.deleteNavigationTreeByTreeId(map.get("treeId").toString());
return new JsonResult(ResultCode.SUCCESS);
}
/**
* 任务状态查看,异步调用azkaban服务
*/
......@@ -272,11 +277,20 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
public JsonResult querySoureAndTargetColumnsByParams(SoureAndTargetColumnsReq soureAndTargetColumnsReq) throws Exception {
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo dsInfo = offlineSynchDao.querySourceDbInfoBySourceId(soureAndTargetColumnsReq.getSourceDbId());
//通过目标数据库id ,查询数据配置
DmpAgentDatasourceInfo targetInfo = offlineSynchDao.querySourceDbInfoBySourceId(soureAndTargetColumnsReq.getTargetDbId());
//dsInfo.setHdfsUserName(body.getAccessId()); // 专为HDFS CLIENT提供
dsInfo.setFileType(soureAndTargetColumnsReq.getFileType());
dsInfo.setDelimiter(soureAndTargetColumnsReq.getCsvDelimiter());
dsInfo.setIsHaveHeader(soureAndTargetColumnsReq.getCsvIsHaveHeader());
//解码源数据库密码
if (StringUtils.isNotBlank(dsInfo.getPassword())) {
dsInfo.setPassword(new BaseService().decode(dsInfo.getPassword(), publicKey));
}
//创建jdbc,获取源数据表字段
DmpAgentResult rst = dmpDsAgentServiceImp.getTableColumnList(dsInfo, soureAndTargetColumnsReq.getTargetTableName());
if (!rst.getCode().val().equals("200")) {
......@@ -286,18 +300,94 @@ public class OfflineSynchServiceImpl implements OfflineSynchService {
if (cols == null || cols.size() == 0) {
return new JsonResult(ResultCode.INTERNAL_SERVER_ERROR, "获取不到对应数据源的数据");
}
logger.info("###################################### 数据源的表字段 success ####################################");
DmpTable table = new DmpTable(); //项目表信息
table.setTableMode(dsInfo.getDatasourceType()); //数据源类型ID
table.setTableMode(dsInfo.getDatasourceTypeId()); //数据源类型ID
table.setCnName(soureAndTargetColumnsReq.getTargetTableName()); //中文名
table.setIsShow("0"); //是否隐藏
table.setName(soureAndTargetColumnsReq.getTargetTableName());//表名
table.setTableDesc("数据同步-源表"); //描述
table.setDataStatus("1"); //数据表状态 默认1
dmpTableDao.deleteTableColumnByName(table);
dmpTableDao.deleteTableByName(table);
dmpTableDao.insert(table);
return null;
DmpTableFieldSchema schema = new DmpTableFieldSchema(); //项目表字段类型
schema.setDsId(Integer.valueOf(dsInfo.getDatasourceTypeId()));
List<DmpTableFieldSchema> sourceFieldSchema = dmpTableFieldSchemaDao.queryAll(schema); //源字段类型
schema.setDsId(Integer.valueOf(dsInfo.getDatasourceTypeId()));
List<DmpTableFieldSchema> targetFieldSchema = dmpTableFieldSchemaDao.queryAll(schema); //目标字段类型
if (sourceFieldSchema != null && sourceFieldSchema.size() > 0) {
Map<String, Integer> ftm = new HashMap<String, Integer>();
for (DmpTableFieldSchema s : sourceFieldSchema) { //便利源字段类型
ftm.put(s.getFieldType(), s.getId().intValue());
}
List<DmpTableColumn> columns = new ArrayList<DmpTableColumn>();
for (int i = 0; i < cols.size(); i++) { //便利源字段
Map<String, Object> map = cols.get(i);
String name = (String) map.get("name");
String type = (String) map.get("type");
Integer size = (Integer) map.get("size");
Integer scale = (Integer) map.get("scale");
String comment = (String) map.get("comment");
DmpTableColumn dtc = new DmpTableColumn();
dtc.setField(name); //字段
dtc.setType(ftm.get(type)); //类型
if (size != null) dtc.setSize(size); //大小
if (scale != null) dtc.setScale(scale); //经度
dtc.setTableId(table.getId().intValue()); //tableID
dtc.setComment(comment); //备注说明
dtc.setDataStatus("1"); //数据表状态 默认1
columns.add(dtc);
}
//向dmp_table_column批量新增
dmpTableColumnDao.insertBatch(columns);
}
Map<String, Object> params = new HashMap<String, Object>();
params.put("targetDsId", targetInfo.getDatasourceTypeId()); //数据源类型ID
params.put("sourceDsId", dsInfo.getDatasourceTypeId());
params.put("tableName", soureAndTargetColumnsReq.getTargetTableName());
List<Map<String, Object>> fieldsMappingList = dmpTableFieldSchemaDao.getTableFieldsMapping(params);
List<Map<String, Object>> sourceFieldColumns = new ArrayList<Map<String, Object>>();
List<Map<String, Object>> targetFieldColumns = new ArrayList<Map<String, Object>>();
for (Map<String, Object> m : fieldsMappingList) {
String field = (String) m.get("field");
String sourceType = (String) m.get("sourceType");
String targetType = "STRING";
if (m.get("targetType") != null && StringUtils.isNotBlank((String) m.get("targetType"))) {
targetType = (String) m.get("targetType");
}
Integer size = (Integer) m.get("size");
Integer scale = (Integer) m.get("scale");
Map<String, Object> sourceFieldColumn = new HashMap<String, Object>();//源表字段
sourceFieldColumn.put("name", field); //字段
sourceFieldColumn.put("type", sourceType); //类型
sourceFieldColumn.put("size", size); //大小
sourceFieldColumn.put("scale", scale); //经度
Map<String, Object> targetFieldColumn = new HashMap<String, Object>();//目标表字段
targetFieldColumn.put("name", field);
targetFieldColumn.put("type", targetType);
targetFieldColumn.put("size", size);
targetFieldColumn.put("scale", scale);
sourceFieldColumns.add(sourceFieldColumn);
targetFieldColumns.add(targetFieldColumn);
}
Map<String, Object> bak = new HashMap<String, Object>();
bak.put("SOURCE_FIELD_SCHEMA", sourceFieldSchema);
bak.put("SOURCE_FIELD_COLUMNS", sourceFieldColumns);
bak.put("TARGET_FIELD_SCHEMA", targetFieldSchema);
bak.put("TARGET_FIELD_COLUMNS", targetFieldColumns);
dmpTableDao.deleteById(table.getId());
dmpTableColumnDao.deleteByTableId(table.getId());
return new JsonResult(ResultCode.SUCCESS, bak);
}
}
......@@ -24,7 +24,7 @@
<select id="queryById" resultMap="DmpTableColumnMap">
select
ID, FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TABLE_ID, COMMENT
from dmp_web.dmp_table_column
from dmp_table_column
where ID = #{id}
</select>
......@@ -32,7 +32,7 @@
<select id="queryAllByLimit" resultMap="DmpTableColumnMap">
select
ID, FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TABLE_ID, COMMENT
from dmp_web.dmp_table_column
from dmp_table_column
limit #{offset}, #{limit}
</select>
......@@ -41,7 +41,7 @@
select
ID, FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS, CREATE_USER_ID, CREATE_TIME,
UPDATE_USER_ID, UPDATE_TIME, TABLE_ID, COMMENT
from dmp_web.dmp_table_column
from dmp_table_column
<where>
<if test="id != null">
and ID = #{id}
......@@ -93,15 +93,15 @@
<!--新增所有列-->
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_table_column(FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TABLE_ID, COMMENT)
insert into dmp_table_column(FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS, CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TABLE_ID, COMMENT)
values (#{field}, #{type}, #{size}, #{scale}, #{nullable}, #{defaultValue}, #{mode}, #{dataStatus}, #{createUserId}, #{createTime}, #{updateUserId}, #{updateTime}, #{tableId}, #{comment})
</insert>
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_table_column(FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS,
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true" parameterType="java.util.List">
insert into dmp_table_column(FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS,
CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TABLE_ID, COMMENT)
values
<foreach collection="entities" item="entity" separator=",">
<foreach collection="list" item="entity" separator=",">
(#{entity.field}, #{entity.type}, #{entity.size}, #{entity.scale}, #{entity.nullable},
#{entity.defaultValue}, #{entity.mode}, #{entity.dataStatus}, #{entity.createUserId}, #{entity.createTime},
#{entity.updateUserId}, #{entity.updateTime}, #{entity.tableId}, #{entity.comment})
......@@ -109,7 +109,7 @@
</insert>
<insert id="insertOrUpdateBatch" keyProperty="id" useGeneratedKeys="true">
insert into dmp_web.dmp_table_column(FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS,
insert into dmp_table_column(FIELD, TYPE, SIZE, SCALE, NULLABLE, DEFAULT_VALUE, MODE, DATA_STATUS,
CREATE_USER_ID, CREATE_TIME, UPDATE_USER_ID, UPDATE_TIME, TABLE_ID, COMMENT)
values
<foreach collection="entities" item="entity" separator=",">
......@@ -127,7 +127,7 @@
<!--通过主键修改数据-->
<update id="update">
update dmp_web.dmp_table_column
update dmp_table_column
<set>
<if test="field != null and field != ''">
FIELD = #{field},
......@@ -177,7 +177,12 @@
<!--通过主键删除-->
<delete id="deleteById">
delete from dmp_web.dmp_table_column where ID = #{id}
delete from dmp_table_column where ID = #{id}
</delete>
<!--通过tableId删除-->
<delete id="deleteByTableId">
delete from dmp_table_column where table_id = #{tableId}
</delete>
</mapper>
\ No newline at end of file
......@@ -100,4 +100,30 @@
delete from dmp_web.dmp_table_field_schema where ID = #{id}
</delete>
<select id="getTableFieldsMapping" parameterType="java.util.Map" resultType="java.util.Map">
select
b.field,
c.field_type as sourceType,
(case when e.target_type is null then g.field_type else e.target_type end) as targetType
from dmp_table a
left join dmp_table_column b on b.data_status = '1' and a.id = b.table_id
left join dmp_table_field_schema c on b.type = c.id
left join (
select
b1.source_id, b1.target_id,
b2.field_type as source_type,
b3.field_type as target_type,
b3.ds_id as ds_id
from dmp_table_field_mapping b1
left join dmp_table_field_schema b2 on b1.source_id = b2.id
left join dmp_table_field_schema b3 on b1.target_id = b3.id
) e
on c.id = e.source_id <if test="targetDsId != null">and e.ds_id = #{targetDsId}</if>
left join dmp_table_field_schema g on c.type_category = g.type_category and g.is_cat_default = '1'
<if test="targetDsId != null">and g.ds_id = #{targetDsId}</if>
where a.data_status = '1'
<if test="sourceDsId != null">and a.table_mode = #{sourceDsId}</if>
<if test="tableName != null">and a.name = #{tableName}</if>
</select>
</mapper>
\ No newline at end of file
......@@ -96,6 +96,7 @@
<select id="querySourceDbInfoBySourceId" parameterType="map" resultType="dmpAgentDatasourceInfo">
SELECT
t1.id,
t1.datasource_type as datasourceTypeId,
t1.datasource_name,
t1.datasource_desc,
t1.jdbc_url,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment