Commit af35974e authored by mcb's avatar mcb

commit

parent 8d96b5b9
package com.jz.common.constant;
/**
* @ClassName: DatasouceTypeConstant
* @Description: TODO(通用常亮)
* @author Bellamy
* @date 2021年2月26日
*
*/
public class DatasouceTypeConstant {
public static final int MySQL = 1;
public static final int SqlServer = 2;
public static final int PostgreSQL = 3;
public static final int Oracle = 4;
public static final int DM = 5;
public static final int DB2 = 6;
public static final int Hive = 7;
public static final int Impala = 8;
public static final int Kudu = 9;
public static final int HDFS = 10;
public static final int SFTP = 11;
public static final int Elasticsearch = 12;
public static final int Informix = 21;
}
package com.jz.common.enums;
import com.github.pagehelper.parser.SqlServer;
/**
* 删除标识
*
* @author Bellamy
* @since 2020-11-30 14:30:23
*/
public enum DatasourceTypeEnum {
MySQL("mysql", 1),
SqlServer("SqlServer", 2),
PostgreSQL("PostgreSQL", 3),
Oracle("Oracle", 4),
DM("DM", 5),
DB2("DB2", 6),
Hive("Hive", 7),
Impala("Impala", 8),
Kudu("Kudu", 9),
HDFS("HDFS", 10),
SFTP("SFTP", 11),
Elasticsearch("Elasticsearch", 12),
Informix("Informix", 21),
;
private String code;
private int value;
private DatasourceTypeEnum(String code, int value) {
this.code = code;
this.value = value;
}
public static DatasourceTypeEnum get(String code) {
if (code == null) {
return null;
}
for (DatasourceTypeEnum status : values()) {
if (status.getCode().equalsIgnoreCase(code)) {
return status;
}
}
return null;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
}
...@@ -113,4 +113,15 @@ public class CommonUtils { ...@@ -113,4 +113,15 @@ public class CommonUtils {
return "jz_dmp_"+ id; return "jz_dmp_"+ id;
} }
/**
* @Title: 实时任务 topic
* @Description: TODO(拼接azkaban用名称)
* @param @return 参数
* @return String 返回类型
* @throws
*/
public static String getRealTimeTopicName(String id) {
return "jz_dmp_realtime_"+ id;
}
} }
...@@ -169,7 +169,6 @@ public class DmpRealtimeSyncInfo implements Serializable { ...@@ -169,7 +169,6 @@ public class DmpRealtimeSyncInfo implements Serializable {
@ApiModelProperty(value = "任务名称") @ApiModelProperty(value = "任务名称")
private String name; private String name;
public Integer getId() { public Integer getId() {
return id; return id;
} }
......
...@@ -80,6 +80,8 @@ public class RealTimeSyncDataSourceModel { ...@@ -80,6 +80,8 @@ public class RealTimeSyncDataSourceModel {
* */ * */
private String endpoint; private String endpoint;
private Integer datasourceTypeId;
public Integer getDatasourceType() { public Integer getDatasourceType() {
return datasourceType; return datasourceType;
} }
...@@ -207,4 +209,12 @@ public class RealTimeSyncDataSourceModel { ...@@ -207,4 +209,12 @@ public class RealTimeSyncDataSourceModel {
public void setEndpoint(String endpoint) { public void setEndpoint(String endpoint) {
this.endpoint = endpoint; this.endpoint = endpoint;
} }
public Integer getDatasourceTypeId() {
return datasourceTypeId;
}
public void setDatasourceTypeId(Integer datasourceTypeId) {
this.datasourceTypeId = datasourceTypeId;
}
} }
...@@ -6,11 +6,14 @@ import com.amazonaws.services.dynamodbv2.xspec.M; ...@@ -6,11 +6,14 @@ import com.amazonaws.services.dynamodbv2.xspec.M;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import com.jz.agent.service.DmpDsAgentService; import com.jz.agent.service.DmpDsAgentService;
import com.jz.common.constant.DatasouceTypeConstant;
import com.jz.common.constant.JsonResult; import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode; import com.jz.common.constant.ResultCode;
import com.jz.common.enums.DatasourceTypeEnum;
import com.jz.common.enums.DelFlagEnum; import com.jz.common.enums.DelFlagEnum;
import com.jz.common.page.PageInfoResponse; import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService; import com.jz.common.persistence.BaseService;
import com.jz.common.utils.CommonUtils;
import com.jz.common.utils.JsonMapper; import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.realTime.CmdUtils; import com.jz.common.utils.realTime.CmdUtils;
import com.jz.common.utils.realTime.DBUtil; import com.jz.common.utils.realTime.DBUtil;
...@@ -335,6 +338,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -335,6 +338,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
Long realtimeId = submitDatasource2DatasourceToConnector(projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params); Long realtimeId = submitDatasource2DatasourceToConnector(projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
if (realtimeId == null) { if (realtimeId == null) {
throw new RuntimeException("保存失败!"); throw new RuntimeException("保存失败!");
} else {
submitSink2tableConnector(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
} }
//处理已选择的表信息 //处理已选择的表信息
//submitNoSelectTable(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params); //submitNoSelectTable(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
...@@ -343,6 +348,118 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic ...@@ -343,6 +348,118 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
return JsonResult.ok(map); return JsonResult.ok(map);
} }
private void submitSink2tableConnector(Long realtimeId, Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception {
DmpNavigationTree tree = dmpNavigationTreeDao.queryById(Integer.valueOf(params.get("treeId").toString()));
String sourceName = tree.getName();
Integer srcDataSourceId = sourceDbInfo.getId();
Integer targetDataSourceId = targetDbInfo.getId();
String taskId = String.valueOf(params.get("taskId"));
String kafkaSchemaRegisterUrl = dmpProjectSystemInfo.getKafkaSchemaRegisterUrl();
String endpoint = targetDbInfo.getEndpoint();
String jdbcUrl = targetDbInfo.getJdbcUrl();
String dbType = "";
Map<String, String> sinkModel = new HashMap<>();
String topicName = CommonUtils.getRealTimeTopicName(taskId);
//sinkModel.put("sourceName",topicName);
sinkModel.put("shareKey", "OdqhcqdzCHGrQupYIsLKkAb9UOvm332r");
if (DatasourceTypeEnum.Kudu.getValue() == targetDbInfo.getDatasourceTypeId()) {
dbType = DatasourceTypeEnum.Kudu.getCode().toLowerCase();
sinkModel.put("kuduMaster", endpoint);
String impalaUrl = jdbcUrl;
if (StringUtils.isNotEmpty(impalaUrl)) {
impalaUrl = impalaUrl.replace("jdbc:impala://", "");
if (impalaUrl.contains("/")) {
impalaUrl = impalaUrl.substring(0, impalaUrl.indexOf("/"));
}
sinkModel.put("impalaUrl", impalaUrl);
}
sinkModel.put("impalaFqdn", targetDbInfo.getAccessId());
sinkModel.put("registryUrl", kafkaSchemaRegisterUrl);
} else if (DatasourceTypeEnum.HDFS.getValue() == targetDbInfo.getDatasourceTypeId()) {
dbType = DatasourceTypeEnum.HDFS.getCode().toLowerCase();
sinkModel.put("jdbcUrl", jdbcUrl);
sinkModel.put("databaseName", targetDbInfo.getDbName());
sinkModel.put("hiveMetastoreUris", endpoint);
sinkModel.put("hdfsNamenodePrincipal", targetDbInfo.getAccessId());
sinkModel.put("keytabLocation", targetDbInfo.getAccessKey());
}
sinkModel.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag")); //安全验证开关,是否启用KERBEROS
List<Map<String, Object>> tables = (List<Map<String, Object>>) params.get("tables");
if (tables.size() > 0 && tables != null) {
for (Map<String, Object> str : tables) {
Map<String, String> paramJson = new HashMap<>();
String sourceTableName = String.valueOf(str.get("sourceTableName"));
String targetTableName = String.valueOf(str.get("targetTableName"));
String targetDbNameAndTableName = targetDbInfo.getDbName() + "." + targetTableName;
paramJson.put("targetDbNameAndTableName", targetDbNameAndTableName);
String desensitizationField = "";
String arithmetic = "";
List<Map<String, String>> fieldInfo = (List<Map<String, String>>) str.get("fieldInfo"); //脱敏字段,算法
for (Map<String, String> field : fieldInfo) {
desensitizationField = "," + field.get("desensitizationField");
arithmetic = field.get("arithmetic");
}
paramJson.put("desensitizationField", desensitizationField.substring(1));
paramJson.put("arithmetic", arithmetic);
paramJson.putAll(sinkModel);
String topic = sourceDbInfo.getDatasourceName() + "_" + topicName + "." + sourceDbInfo.getDbName() + "." + targetTableName;
paramJson.put("topic", topic);
paramJson.put("name", targetDbInfo.getDatasourceName() + "@" + topic);
String jsonStr = freemakerJson(dbType + "_sink", paramJson);
logger.info("=======send sink to kafka {}" + jsonStr);
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
saveBody.setSrcDatasourceId(srcDataSourceId);
saveBody.setSrcDatasourceName(sourceDbInfo.getDatasourceName());
saveBody.setSrcDatabaseType(sourceDbInfo.getDataSourceTypeName());
saveBody.setSrcDatabaseName(sourceDbInfo.getDbName());
saveBody.setTargetDatasourceId(targetDataSourceId);
saveBody.setTargetDatasourceName(targetDbInfo.getDatasourceName());
saveBody.setTargetDatabaseType(targetDbInfo.getDataSourceTypeName());
saveBody.setTargetDatabaseName(targetDbInfo.getDbName());
saveBody.setType(1);
saveBody.setConnectorJsonData(jsonStr);
saveBody.setProjectId(String.valueOf(projectId));
saveBody.setConnectorUrl(connectorUrl);
saveBody.setSrcTopicName(topic);
saveBody.setSourceTypeName(sourceDbInfo.getDataSourceTypeName());
saveBody.setTargetTypeName("Kafka");
saveBody.setCreateTime(new Date());
saveBody.setCrePerson(SessionUtils.getCurrentUserId());
saveBody.setVersion("1.0");
saveBody.setScriptJson(JSONObject.toJSONString(params));
saveBody.setTreeId(params.get("treeId").toString());
saveBody.setStoreType(dbType);
saveBody.setParentId(Integer.valueOf(realtimeId.toString()));
if (StringUtils.isNotEmpty(String.valueOf(params.get("taskId")))) {
DmpRealtimeSyncInfo param = new DmpRealtimeSyncInfo();
param.setParentId(Integer.valueOf(realtimeId.toString()));
//编辑时,先删除任务,再发布任务
List<DmpRealtimeSyncInfo> sinkTaskList = dmpRealtimeSyncInfoDao.queryAll(param);
if (sinkTaskList.size() > 0 && sinkTaskList != null) {
Map maps = new HashMap();
maps.put("dataStatus", "0");
String ids = "";
for (DmpRealtimeSyncInfo item : sinkTaskList) {
ids = "," + item.getId();
HttpClientUtils.httpDelete(connectorUrl + "/" + item.getConnectorJobId() + "/");
}
maps.put("ids", ids.substring(1).split(","));
dmpRealtimeSyncInfoDao.deleteByrealTaskId(maps);
}
//新增sink
Map<String, String> respData = publishTask2Kafka(connectorUrl, jsonStr, "");
saveBody.setConnectorJobId(respData.get("connectorJobId"));
saveBody.setStatus(respData.get("status"));
dmpRealtimeSyncInfoDao.insert(saveBody);
}
}
}
}
private Long submitDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception { private Long submitDatasource2DatasourceToConnector(Long projectId, RealTimeSyncDataSourceModel sourceDbInfo, RealTimeSyncDataSourceModel targetDbInfo, DmpProjectSystemInfo dmpProjectSystemInfo, String connectorUrl, Map<String, Object> params) throws Exception {
DmpNavigationTree tree = dmpNavigationTreeDao.queryById(Integer.valueOf(params.get("treeId").toString())); DmpNavigationTree tree = dmpNavigationTreeDao.queryById(Integer.valueOf(params.get("treeId").toString()));
Integer srcDataSourceId = sourceDbInfo.getId(); Integer srcDataSourceId = sourceDbInfo.getId();
......
...@@ -63,7 +63,7 @@ ...@@ -63,7 +63,7 @@
target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time,
cre_person, upt_person cre_person, upt_person
from dmp_realtime_sync_info from dmp_realtime_sync_info
<where> where 1=1 and data_status = '1'
<if test="id != null"> <if test="id != null">
and id = #{id} and id = #{id}
</if> </if>
...@@ -151,7 +151,6 @@ ...@@ -151,7 +151,6 @@
<if test="uptPerson != null and uptPerson != ''"> <if test="uptPerson != null and uptPerson != ''">
and upt_person = #{uptPerson} and upt_person = #{uptPerson}
</if> </if>
</where>
</select> </select>
<!--新增所有列--> <!--新增所有列-->
...@@ -391,7 +390,8 @@ ...@@ -391,7 +390,8 @@
ds.access_key as accessKey, ds.access_key as accessKey,
ds.access_id as accessId, ds.access_id as accessId,
dsdt.datasource_type as datasourceTypeName, dsdt.datasource_type as datasourceTypeName,
dsdt.driver_class_name as driverName dsdt.driver_class_name as driverName,
dsdt.id as datasourceTypeId
from dmp_syncing_datasource ds from dmp_syncing_datasource ds
inner join dmp_syncing_datasource_type dsdt on ds.datasource_type = dsdt.id inner join dmp_syncing_datasource_type dsdt on ds.datasource_type = dsdt.id
where 1=1 and ds.data_status = '1' where 1=1 and ds.data_status = '1'
......
<#if name??>
{
"name":"hdfs-sink-connector-${name}",
"config":
</#if>
{
"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "3",
"flush.size": "200000",
"topics": "${topic!}",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
<#if databaseName??>
"hdfs.url": "${jdbcUrl?replace(('/'+databaseName!),'')}",
<#else>
"hdfs.url": "${jdbcUrl!}",
</#if>
"hdfs.authentication.kerberos": "${connectorSecurityFlag}",
<#if connectorSecurityFlag == "true">
"hdfs.namenode.principal": "${hdfsNamenodePrincipal!}",
"connect.hdfs.principal": "${hdfsNamenodePrincipal!}",
"connect.hdfs.keytab": "${keytabLocation!}",
</#if>
"hive.metastore.uris": "${hiveMetastoreUris!}",
"connect.hdfs.desensitise.columns": "${desensitizationField!}",
"connect.hdfs.desensitise.algorithm": "${arithmetic!}",
"connect.hdfs.desensitise.sharesecret": "${shareKey!}",
"hive.integration":"true",
"schema.compatibility": "BACKWARD",
"hive.conf.dir": "/opt/cloudera/parcels/CDH/lib/hive/conf",
"hive.home": "/opt/cloudera/parcels/CDH/lib/hive",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
"locale": "CHINA",
"timezone": "Asia/Shanghai",
"rotate.interval.ms": "300000",
"logs.dir": "hdfssink/logs/${databaseName!}",
"topics.dir": "hdfssink/data/${databaseName!}",
"hive.database": "${databaseName!}",
"consumer.auto.offset.reset": "earliest"
}
<#if name??>
}
</#if>
<#if name??>
{
"name":"kudu-sink-connector-${name}",
"config":
</#if>
{
"connector.class": "com.datamountaineer.streamreactor.connect.kudu.sink.KuduSinkConnector",
"tasks.max": "1",
"connect.kudu.master": "${kuduMaster!}",
"connect.kudu.kcql": "UPSERT INTO impala::${targetDbNameAndTableName!} SELECT * FROM ${topic!} AUTOCREATE AUTOEVOLVE DISTRIBUTEBY ${pkName!} INTO 2 BUCKETS",
"connect.kudu.desensitise.columns": "${desensitizationField!}",
"connect.kudu.desensitise.algorithm": "${arithmetic!}",
"connect.kudu.desensitise.sharesecret": "${shareKey!}",
"connect.impala.master": "${impalaUrl!}",
"connect.impala.master.fqdn": "${impalaFqdn!}",
"connect.progress.enabled":"true",
"topics": "${topic!}",
"connect.kerberos.enable": "${connectorSecurityFlag}",
<#if connectorSecurityFlag == "true">
"sasl.mechanism": "GSSAPI",
"security.protocol": "SASL_PLAINTEXT",
"sasl.kerberos.service.name": "kafka",
"connect.kerberos.logincontext": "KafkaClient",
</#if>
"connect.kudu.write.flush.mode": "BATCH_SYNC",
"connect.kudu.mutation.buffer.space":"100000"
}
<#if name??>
}
</#if>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment