Commit 4ef746d4 authored by mcb's avatar mcb

实时同步,数据源,bug修改

parent 00dbac5e
......@@ -3,6 +3,7 @@ package com.jz.dmp.modules.controller.DataIntegration;
import com.jz.common.constant.JsonResult;
import com.jz.common.constant.ResultCode;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.utils.DateUtils;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceListDto;
import com.jz.dmp.modules.controller.DataIntegration.bean.DataSourceListReq;
import com.jz.dmp.modules.controller.DataIntegration.bean.DmpSyncingDatasourceReq;
......@@ -135,8 +136,13 @@ public class DataSourceController {
try {
result = dmpSyncingDatasourceService.testConnection(saveBody);
} catch (Exception e) {
logger.info("###################" + e.getMessage() + "###################");
return new JsonResult(ResultCode.INTERNAL_SERVER_ERROR,e.getMessage());
Map map = new HashMap();
map.put("status","异常");
map.put("testConnectStatus","03");
map.put("testTime", DateUtils.currentDatetime());
result.setData(map);
result.setCode(ResultCode.SUCCESS);
e.printStackTrace();
}
return result;
}
......@@ -150,7 +156,7 @@ public class DataSourceController {
@ApiOperation(value = "编辑数据源--根据id查询数据回显", notes = "编辑数据源--根据id查询数据回显")
@GetMapping(value = "/selectDataSourceInfoById")
@ApiImplicitParams({@ApiImplicitParam(name = "datasourceId", value = "数据源id", required = true), @ApiImplicitParam(name = "projectId", value = "项目id")})
public JsonResult getDataSourceInfoById(@RequestParam String datasourceId, @RequestParam(value = "projectId", required = false) String projectId) throws Exception {
public JsonResult<DataSourceListDto> getDataSourceInfoById(@RequestParam String datasourceId, @RequestParam(value = "projectId", required = false) String projectId) throws Exception {
if (StringUtils.isEmpty(datasourceId)) {
return new JsonResult(ResultCode.PARAMS_ERROR);
}
......@@ -170,7 +176,7 @@ public class DataSourceController {
@ApiOperation(value = "编辑数据源", notes = "编辑数据源")
@PostMapping(value = "/updateDatasourceInfo")
public JsonResult updateDatasourceInfo(@RequestBody @Validated DmpSyncingDatasourceReq saveBody, HttpServletRequest httpRequest) throws Exception {
if (StringUtils.isEmpty(saveBody.getDatasourceId())) {
if (StringUtils.isEmpty(saveBody.getId())) {
return new JsonResult(ResultCode.PARAMS_ERROR);
}
JsonResult result = dmpSyncingDatasourceService.updateDatasourceById(saveBody);
......
package com.jz.dmp.modules.controller.DataIntegration.bean;
import com.jz.common.page.BasePageBean;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
......
......@@ -92,6 +92,97 @@ public class DataSourceListDto {
@ApiModelProperty(value = "测试连通状态:01未测试,02连通性正常,03连通性异常")
private String testConnectStatus;
@ApiModelProperty(value = "FTP协议")
private String protocol;
/**
* IP
*/
@ApiModelProperty(value = "IP")
private String host;
/**
* 端口
*/
@ApiModelProperty(value = "端口")
private String port;
/**
* hdfsNamenodePrincipal
*/
@ApiModelProperty(value = "hdfsNamenodePrincipal")
private String hdfsNamenodePrincipal;
/**
* hiveMetastoreUrisThrift
*/
@ApiModelProperty(value = "hiveMetastoreUrisThrift")
private String hiveMetastoreUrisThrift;
/**
* keytabLocation
*/
@ApiModelProperty(value = "keytabLocation")
private String keytabLocation;
/**
* bootstrap地址
*/
@ApiModelProperty(value = "bootstrap地址")
private String bootstrapAddress;
/**
* jaas地址
*/
@ApiModelProperty(value = "jaas地址")
private String jaasAddress;
/**
* krb5地址
*/
@ApiModelProperty(value = "krb5地址")
private String krb5Address;
/**
* kudu Master
*/
@ApiModelProperty(value = "kudu Master")
private String kuduMaster;
/**
* impalaMasterFqdn
*/
@ApiModelProperty(value = "impalaMasterFqdn")
private String impalaMasterFqdn;
/**
* NameNode地址
*/
@ApiModelProperty(value = "NameNode地址")
private String defaultFs;
@ApiModelProperty(value = "表空间")
private String tableSchema;
/**
* 终端信息
*/
@ApiModelProperty(value = "终端信息")
private String endpoint;
/**
* Bucket信息
*/
@ApiModelProperty(value = "Bucket信息")
private String bucket;
/**
* accessId
*/
@ApiModelProperty(value = "accessId")
private String accessId;
/**
* accessKey
*/
@ApiModelProperty(value = "accessKey")
private String accessKey;
public Long getId() {
return id;
}
......@@ -116,14 +207,6 @@ public class DataSourceListDto {
this.datasourceTypeName = datasourceTypeName;
}
public String getDatasourceTypeId() {
return datasourceTypeId;
}
public void setDatasourceTypeId(String datasourceTypeId) {
this.datasourceTypeId = datasourceTypeId;
}
public String getDatasourceCatecode() {
return datasourceCatecode;
}
......@@ -195,4 +278,148 @@ public class DataSourceListDto {
public void setTestConnectStatus(String testConnectStatus) {
this.testConnectStatus = testConnectStatus;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public String getHdfsNamenodePrincipal() {
return hdfsNamenodePrincipal;
}
public void setHdfsNamenodePrincipal(String hdfsNamenodePrincipal) {
this.hdfsNamenodePrincipal = hdfsNamenodePrincipal;
}
public String getHiveMetastoreUrisThrift() {
return hiveMetastoreUrisThrift;
}
public void setHiveMetastoreUrisThrift(String hiveMetastoreUrisThrift) {
this.hiveMetastoreUrisThrift = hiveMetastoreUrisThrift;
}
public String getKeytabLocation() {
return keytabLocation;
}
public void setKeytabLocation(String keytabLocation) {
this.keytabLocation = keytabLocation;
}
public String getBootstrapAddress() {
return bootstrapAddress;
}
public void setBootstrapAddress(String bootstrapAddress) {
this.bootstrapAddress = bootstrapAddress;
}
public String getJaasAddress() {
return jaasAddress;
}
public void setJaasAddress(String jaasAddress) {
this.jaasAddress = jaasAddress;
}
public String getKrb5Address() {
return krb5Address;
}
public void setKrb5Address(String krb5Address) {
this.krb5Address = krb5Address;
}
public String getKuduMaster() {
return kuduMaster;
}
public void setKuduMaster(String kuduMaster) {
this.kuduMaster = kuduMaster;
}
public String getImpalaMasterFqdn() {
return impalaMasterFqdn;
}
public void setImpalaMasterFqdn(String impalaMasterFqdn) {
this.impalaMasterFqdn = impalaMasterFqdn;
}
public String getDefaultFs() {
return defaultFs;
}
public void setDefaultFs(String defaultFs) {
this.defaultFs = defaultFs;
}
public String getTableSchema() {
return tableSchema;
}
public void setTableSchema(String tableSchema) {
this.tableSchema = tableSchema;
}
public String getEndpoint() {
return endpoint;
}
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
public String getBucket() {
return bucket;
}
public void setBucket(String bucket) {
this.bucket = bucket;
}
public String getAccessId() {
return accessId;
}
public void setAccessId(String accessId) {
this.accessId = accessId;
}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getDatasourceTypeId() {
return datasourceTypeId;
}
public void setDatasourceTypeId(String datasourceTypeId) {
this.datasourceTypeId = datasourceTypeId;
}
}
......@@ -29,7 +29,7 @@ public class DmpSyncingDatasourceReq implements Serializable {
* 数据源ID
*/
@ApiModelProperty(value = "数据源ID")
private String datasourceId;
private String id;
/**
* 数据源类型ID
*/
......@@ -174,13 +174,12 @@ public class DmpSyncingDatasourceReq implements Serializable {
private String isHaveHeader;
public String getDatasourceId() {
return datasourceId;
public String getId() {
return id;
}
public void setDatasourceId(String datasourceId) {
this.datasourceId = datasourceId;
public void setId(String id) {
this.id = id;
}
public String getDatasourceType() {
......
......@@ -320,8 +320,12 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
String kafkaConnectUrl = dmpProjectSystemInfo.getKafkaConnectorUrl(); //kafka 连接信息
String[] arr = kafkaConnectUrl.split(",");
//connect1@http://172.18.104.130:9993/connectors
if (StringUtils.isEmpty(connectorUrl))
/* if (StringUtils.isEmpty(connectorUrl))
return JsonResult.error(ResultCode.PARAMS_ERROR, "connectorUrl不能为空!");
if (connectorUrl.contains("@")) {
connectorUrl = connectorUrl.split("@")[1];
}*/
connectorUrl = arr[0];
if (connectorUrl.contains("@")) {
connectorUrl = connectorUrl.split("@")[1];
}
......@@ -331,7 +335,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
throw new RuntimeException("保存失败!");
}
//处理已选择的表信息
submitNoSelectTable(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
//submitNoSelectTable(realtimeId, projectId, sourceDbInfo, targetDbInfo, dmpProjectSystemInfo, connectorUrl, params);
return JsonResult.ok();
}
......@@ -344,7 +348,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
Long realtiemId = null; //同步任务id
//源数据源到数据源同步connector信息
//解析黑名单表
Map blacklistTablesInfo = getBlackListTableInfo(sourceDbInfo, params);
//Map blacklistTablesInfo = getBlackListTableInfo(sourceDbInfo, params);
//解析已选择表
Map selectlistTablesInfo = getSelectListTableInfo(sourceDbInfo, params);
......@@ -352,8 +356,8 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
Map<String, String> dataModelMap = new HashMap<>();
dataModelMap.put("kafkaBootstrapServers", dmpProjectSystemInfo.getKafkaBootstrapServers());//kafka 地址
dataModelMap.put("registerUrl", dmpProjectSystemInfo.getKafkaSchemaRegisterUrl()); //kafka connector schema 注册地址
dataModelMap.put("blacklistTables", blacklistTablesInfo.get("connectorBlacklistTables").toString()); //设置的黑名单表
dataModelMap.put("blacklistTableCount", blacklistTablesInfo.get("blacklistTableCount").toString()); //黑名单表数量
//dataModelMap.put("blacklistTables", blacklistTablesInfo.get("connectorBlacklistTables").toString()); //设置的黑名单表
//dataModelMap.put("blacklistTableCount", blacklistTablesInfo.get("blacklistTableCount").toString()); //黑名单表数量
//设置的白名单表 在模板里进行判比较黑名单表和白名单表的数量,谁小就用谁
dataModelMap.put("whitelistTablesConut", selectlistTablesInfo.get("whitelistTablesConut").toString()); //已选择表数量
dataModelMap.put("connectorWhitelistTables", selectlistTablesInfo.get("connectorWhitelistTables").toString()); //设置的已选择表
......@@ -367,7 +371,6 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("dbName", sourceDbInfo.getDbName());
dataModelMap.put("connectorSecurityFlag", (String) params.get("connectorSecurityFlag")); //安全验证开关,是否启用KERBEROS
//前端定义的sourceConnectorName前缀
//String sourceName = (String) params.get("sourceName");
String sourceName = tree.getName();
dataModelMap.put("sourceName", sourceName);
......@@ -382,15 +385,19 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
queryParam.put("srcDataSourceId", srcDataSourceId);
queryParam.put("sourceConnectorName", sourceConnectorName);
Map dataMap = dmpRealtimeSyncInfoDao.selecltRealtimeSyncInfoByParams(queryParam);
//不存在同一个数据源的同步任务 没查到信息
if (dataMap == null) {
if (dataMap != null) {
throw new RuntimeException("存在相同的实时任务数据!");
}
dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数
Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
//String connectorJobId = "434343";
//请求接口正常则保存数据,否则失败
if (StringUtils.isNotEmpty(connectorJobId)) {
if (StringUtils.isEmpty(connectorJobId)) {
throw new RuntimeException("提交失败!");
}
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
saveBody.setSrcDatasourceId(srcDataSourceId);
saveBody.setSrcDatasourceName(sourceDbInfo.getDatasourceName());
......@@ -413,6 +420,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
saveBody.setCrePerson(SessionUtils.getCurrentUserId());
saveBody.setVersion("1.0");
saveBody.setScriptJson(JSONObject.toJSONString(params));
saveBody.setTreeId(params.get("treeId").toString());
dmpRealtimeSyncInfoDao.insert(saveBody);
realtiemId = Long.valueOf(saveBody.getId());
logger.info("###################保存实时同步任务--结束 ################");
......@@ -422,20 +430,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
taskHistory.setRealtimeSyncId(saveBody.getId());
dmpRealtimeSyncInfoDao.insertRealtimeHistory(taskHistory);
Map blacklist = new HashMap();
/*Map blacklist = new HashMap();
blacklist.put("creTime", new Date());
blacklist.put("realtimeId", saveBody.getId());
blacklist.put("crePerson", SessionUtils.getCurrentUserId());
blacklist.put("datasourceId", srcDataSourceId);
blacklist.put("blacklistTable", blacklistTablesInfo.get("blacklistTables").toString());
dmpRealtimeSyncInfoDao.insertRealtimeBlackList(blacklist);
logger.info("###################保存实时同步黑名单数据--结束 ################");
} else {
throw new RuntimeException("提交失败!");
}
} else {
throw new RuntimeException("存在相同的实时任务数据!");
}
logger.info("###################保存实时同步黑名单数据--结束 ################");*/
return realtiemId;
}
......@@ -708,8 +710,9 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
dataModelMap.put("name", sourceDbInfo.getDatasourceName() + "_" + sourceName + "-" + sourceDbInfo.getDbName()); //source connector name
String dataSource2DataSourceJsonStr = freemakerJson("source", dataModelMap); //使用freemaker模板生成 kafka connector 请求参数
Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
// Map<String, Object> dataSource2DataSourceResult = RestClient.post(connectorUrl, dataSource2DataSourceJsonStr);
//String connectorJobId = getConnectorJobId(dataSource2DataSourceResult);
String connectorJobId = "dfdfd";
//请求接口正常则保存数据,否则失败
if (StringUtils.isNotEmpty(connectorJobId)) {
DmpRealtimeSyncInfo saveBody = new DmpRealtimeSyncInfo();
......@@ -882,7 +885,7 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
*/
@Override
public JsonResult conflictCheck(ConflictCheckReq params) throws Exception {
List<String> returnList = new ArrayList<>();
List<Map> returnList = new ArrayList<>();
//通过源数据库id ,查询数据源配置
DmpAgentDatasourceInfo dsInfo = offlineSynchDao.querySourceDbInfoBySourceId(Long.valueOf(params.getTargetDataSourceId()));
if (dsInfo == null) {
......@@ -902,9 +905,14 @@ public class DmpRealtimeSyncInfoServiceImpl implements DmpRealtimeSyncInfoServic
String[] tablesName = params.getTablesName().split(",");
for (String table : tablesName) {
if(tableList.contains(table)){
returnList.add(table);
Map map = new HashMap();
map.put("tableName", table);
if (tableList.contains(table)) {
map.put("conflict", "Y");
} else {
map.put("conflict", "N");
}
returnList.add(map);
}
}
return JsonResult.ok(returnList);
......
......@@ -12,6 +12,7 @@ import com.jz.common.enums.TestConnectStatusEnum;
import com.jz.common.exception.ServiceException;
import com.jz.common.page.PageInfoResponse;
import com.jz.common.persistence.BaseService;
import com.jz.common.utils.DateUtils;
import com.jz.common.utils.JsonMapper;
import com.jz.common.utils.web.SessionUtils;
import com.jz.dmp.agent.DmpAgentResult;
......@@ -306,15 +307,25 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
*/
@Override
public JsonResult testConnection(DmpSyncingDatasourceReq saveBody) throws Exception {
Map map = new HashMap();
map.put("testTime", DateUtils.currentDatetime());
DmpAgentDatasourceInfo ds = this.dsInfoDTO(saveBody); //查询数据源 对应的 数据库信息
DmpAgentResult rst = dmpDsAgentServiceImp.testConnect(ds); //连接测试
if (!rst.getCode().val().equals("200")) {
return new JsonResult(ResultCode.INTERNAL_SERVER_ERROR, "连接测试失败!");
map.put("status","异常");
map.put("testConnectStatus","03");
return new JsonResult(map);
} else {
//连接测试成功
Object flag = JsonMapper.fromJsonString(rst.getMessage(), Boolean.class);
//rst.setResult(JsonMapper.fromJsonString(rst.getMessage(), Boolean.class));
return new JsonResult(ResultCode.SUCCESS, "连接测试成功!");
Boolean flag = (Boolean) JsonMapper.fromJsonString(rst.getMessage(), Boolean.class);
if(flag){
map.put("status","正常");
map.put("testConnectStatus","02");
} else {
map.put("status","异常");
map.put("testConnectStatus","03");
}
return JsonResult.ok(map);
}
}
......@@ -351,8 +362,8 @@ public class DmpSyncingDatasourceServiceImpl implements DmpSyncingDatasourceServ
if (StringUtils.isNotEmpty(saveBody.getProjectId())) {
dsd.setProjectId(Integer.valueOf(saveBody.getProjectId()));
}
if (StringUtils.isNotEmpty(saveBody.getDatasourceId())) { //数据源id
dsd.setId(Integer.valueOf(saveBody.getDatasourceId()));
if (StringUtils.isNotEmpty(saveBody.getId())) { //数据源id
dsd.setId(Integer.valueOf(saveBody.getId()));
}
dsd.setTestConnectStatus(saveBody.getTestConnectStatus());
if (StringUtils.isEmpty(saveBody.getTestConnectStatus())) { //默认未测试
......
......@@ -158,10 +158,10 @@
<insert id="insert" keyProperty="id" useGeneratedKeys="true">
insert into dmp_realtime_sync_info(src_datasource_id, target_datasource_id, src_table_name, target_table_name, type, connector_job_id, connector_json_data, src_topic_name
, project_id, parent_id, desensitization_field, arithmetic, pk_name, source_type_name, target_type_name, src_database_type, src_database_name, connector_url, target_database_type
, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person, version, script_json)
, target_database_name, src_datasource_name, target_datasource_name, store_type, status, create_time, update_time, cre_person, upt_person, version, script_json, tree_id)
values (#{srcDatasourceId}, #{targetDatasourceId}, #{srcTableName}, #{targetTableName}, #{type}, #{connectorJobId}, #{connectorJsonData}, #{srcTopicName}, #{projectId}
, #{parentId}, #{desensitizationField}, #{arithmetic}, #{pkName}, #{sourceTypeName}, #{targetTypeName}, #{srcDatabaseType}, #{srcDatabaseName}, #{connectorUrl}, #{targetDatabaseType}
, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson} ,#{version}, #{scriptJson})
, #{targetDatabaseName}, #{srcDatasourceName}, #{targetDatasourceName}, #{storeType}, #{status}, #{createTime}, #{updateTime}, #{crePerson}, #{uptPerson} ,#{version}, #{scriptJson}, #{treeId})
</insert>
<insert id="insertBatch" keyProperty="id" useGeneratedKeys="true">
......
......@@ -374,7 +374,25 @@
a.jdbc_url as jdbcUrl,
a.db_name as dbName,
a.user_name as userName,
a.project_id as projectId
a.project_id as projectId,
a.protocol,
a.host,
a.port,
a.default_fs as defaultFs,
a.test_connect_status as testConnectStatus,
a.table_schema as tableSchema,
a.hdfs_namenode_principal as hdfsNamenodePrincipal,
a.hive_metastore_uris_thrift as hiveMetastoreUrisThrift,
a.keytab_location as keytabLocation,
a.bootstrap_address as bootstrapAddress,
a.jaas_address as jaasAddress,
a.krb5_address as krb5Address,
a.kudu_master as kuduMaster,
a.impala_master_fqdn as impalaMasterFqdn,
a.access_key as accessKey,
a.access_id as accessId,
a.bucket,
a.endpoint
from dmp_syncing_datasource a
left join dmp_syncing_datasource_type b on a.datasource_type = b.id
where a.data_status = '1'
......
......@@ -12,13 +12,10 @@
"database.user": "${dbUserName!}",
"database.password": "${dbPassWord!}",
"database.server.id": "${dataSourceId!}",
"database.dbname" : "${dbName!}",
"database.server.name": "${datasourceName!}_${sourceName!}",
"database.whitelist": "${dbName!}",
<#if ( blacklistTableCount?eval <= whitelistTablesConut?eval ) >
"table.blacklist":"${blacklistTables!}",
<#else >
"table.whitelist":"${connectorWhitelistTables!}",
</#if>
"database.history.kafka.topic": "${topic!}",
<#if connectorSecurityFlag == "true">
"database.history.producer.sasl.mechanism": "GSSAPI",
......@@ -35,6 +32,7 @@
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"decimal.handling.mode": "double",
"tombstones.on.delete":"true",
"include.schema.changes":"true"
}
<#if name??>
......
<#if name??>
{
"name":"debezium-connector-${name}",
"config":
</#if>
{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.hostname":"${dbHost!}",
"tasks.max":"3",
"database.port":"${dbPort!}",
"database.user": "${dbUserName!}",
"database.password": "${dbPassWord!}",
"database.server.id": "${dataSourceId!}",
"database.server.name": "${datasourceName!}_${sourceName!}",
"database.whitelist": "${dbName!}",
<#if ( blacklistTableCount?eval <= whitelistTablesConut?eval ) >
"table.blacklist":"${blacklistTables!}",
<#else >
"table.whitelist":"${connectorWhitelistTables!}",
</#if>
"database.history.kafka.topic": "${topic!}",
<#if connectorSecurityFlag == "true">
"database.history.producer.sasl.mechanism": "GSSAPI",
"database.history.producer.security.protocol": "SASL_PLAINTEXT",
"database.history.producer.sasl.kerberos.service.name": "kafka",
"database.history.consumer.sasl.mechanism": "GSSAPI",
"database.history.consumer.security.protocol": "SASL_PLAINTEXT",
"database.history.consumer.sasl.kerberos.service.name": "kafka",
"database.history.kafka.bootstrap.servers":"${kafkaBootstrapServers}",
<#else>
"database.history.kafka.bootstrap.servers":"${kafkaBootstrapServers?replace("SASL_PLAINTEXT://","")}",
</#if>
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"decimal.handling.mode": "double",
"include.schema.changes":"true"
}
<#if name??>
}
</#if>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment