Я использую debezium для синхронизации данных между двумя postgres серверами БД, и у меня возникла проблема с событием / операцией обновления , так как он записывает событие изменения в kafka topi c игнорируя столбец / поле с нулевым значением (см. ниже infodetcode поле, отсутствующее в структуре before, поскольку оно было пустым в БД), тот же столбец / поле доступен после структуры в качестве значения изменено с «ноль» на «некоторое значение», при этом столбец с нулевым значением отсутствует в структуре «до», когда я сравниваю «до» с «после» структуры, чтобы выяснить, какие значения полей / столбцов являются уникальными / дублирующими для создания запроса dynamici c, запрос строится с отсутствующим столбцом, и необходимо поместить этот столбец в запрос (пожалуйста, найдите приведенную ниже конфигурацию и реализацию сравнения структуры до и после, которая возвращает результат без столбца с нулевым значением), я с удовольствием приму предложения / помощь по этому вопросу.
Примечание: для параметра REPLICA IDENTITY установлено значение "FULL"
Версия: PostgreSQL - 10,9, дебезий - 1.1.1. Финал
До и после Struct-Topi c Запись (фактическая) :
before=struct{accountno=01,currencycode=USD,seqno=1,informationcode=S}
after=struct{accountno=01,currencycode=USD,seqno=1 ,informationcode=M ,infodetcode=N}
До и после Struct-Topi c Запись (ожидается) :
before=struct{accountno=01,currencycode=USD,seqno=1,informationcode=S,infodetcode=null}
after=struct{accountno=01,currencycode=USD,seqno=1 ,informationcode=M ,infodetcode=N}
Конфигурация дебезия:
@Bean
public io.debezium.config.Configuration postgreConnectorConfiguration() {
return io.debezium.config.Configuration.create()
.with("name", "postgres-connector")
.with("snapshot.mode", SnapshotMode.CUSTOM)
.with("snapshot.custom.class", "postgresql.snapshot.CustomSnapshotter")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "/debezium/dbhistory.dat")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/debezium/offset/postgre-offset.dat")
.with("offset.flush.interval.ms", 60000)
.with("snapshot.isolation.mode", "read_committed")
.with("key.converter.schemas.enable",true)
.with("value.converter.schemas.enable", true)
.with("plugin.name", "pgoutput")
.with("slot.name", "debeziumtest")
.with("database.server.name", "server-c")
.with("database.hostname", databaseHost)
.with("database.port", databasePort)
.with("database.user", databaseUserName)
.with("database.password", databasePassword)
.with("database.dbname", databaseName)
.with("table.whitelist", TABLES_TO_MONITOR).build();
}
Сравнение структуры (до и после):
private void handleEvent(SourceRecord sourceRecord) {
Struct sourceRecordEntry = (Struct) sourceRecord.value();
if (sourceRecordEntry != null) {
Struct sourceStruct = (Struct) sourceRecordEntry.get(FieldName.SOURCE);
String tableName = sourceStruct.getString(TABLE);
Date transactionDate = new Date(System.currentTimeMillis());
Long transctionTime = (Long) sourceStruct.get(FieldName.TIMESTAMP);
Time txnTime = new Time(transctionTime);
Long transactionCode = (Long) sourceStruct.get(TRANSACTION_ID);
Operation operation = Operation.forCode(sourceRecordEntry.getString(OPERATION));
if (operation == Operation.UPDATE) {
Map<String, Object> beforeEntryHash;
Map<String, Object> afterEntryHash;
List preFieldList = new ArrayList();
List preValueList = new ArrayList();
List postFieldList = new ArrayList();
List postValueList = new ArrayList();
Integer preFieldcount = 0, preValuecount = 0, postFieldcount = 0, postValuecount = 0;
Struct beforeStruct = (Struct) sourceRecordEntry.get(BEFORE);
Struct afterStruct = (Struct) sourceRecordEntry.get(AFTER);
beforeEntryHash = beforeStruct.schema().fields().stream().map(Field::name).filter(fieldName->beforeStruct.get(fieldName)!=null).map(fieldName-> Pair.of(fieldName, beforeStruct.get(fieldName))).collect(toMap(Pair::getKey,Pair::getValue));
afterEntryHash = afterStruct.schema().fields().stream().map(Field::name).filter(fieldName->afterStruct.get(fieldName)!=null).map(fieldName-> Pair.of(fieldName, afterStruct.get(fieldName))).collect(toMap(Pair::getKey,Pair::getValue));
MapDifference<String, Object> rowDifferenceHash = Maps.difference(beforeEntryHash, afterEntryHash);
for(Entry<String, ValueDifference<Object>> rowEntry : rowDifferenceHash.entriesDiffering().entrySet()) {
preFieldList.add(PR_PREFIX + rowEntry.getKey());
postFieldList.add(PO_PREFIX + rowEntry.getKey());
preValueList.add(SQ + rowEntry.getValue().leftValue() + SQ);
postValueList.add(SQ + rowEntry.getValue().rightValue() + SQ);
LOGGER.info("Key : " + rowEntry.getKey() + " Left Value : " + rowEntry.getValue().leftValue() + " Right Value : " + rowEntry.getValue().rightValue());
}
}
}
}
Сообщение :
SourceRecord{sourcePartition={server=server-c}, sourceOffset={transaction_id=null, lsn_proc=4921004793408, lsn=4921004793408, txId=81939856, ts_usec=1588212060567019}} ConnectRecord{topic='server-c.a.accinfo', kafkaPartition=null, key=Struct{accountno=01 ,currencycode=USD,seqno=1 }, keySchema=Schema{server-c.a.accinfo.Key:STRUCT}, value=Struct{before=Struct{accountno=01 ,currencycode=USD,seqno=1 ,informationcode=S },after=Struct{accountno=01 ,currencycode=USD,seqno=1 ,informationcode=P ,infodetcode=I},source=Struct{version=1.2.0.Alpha1,connector=postgresql,name=server-c,ts_ms=1588212060567,db=OTATEMP,schema=a,table=accinfo,txId=81939856,lsn=4921004793408},op=u,ts_ms=1588213782961}, valueSchema=Schema{server-c.a.accinfo.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
Схема:
[Field{name=before, index=0, schema=Schema{server-c.aeota.accinfo.Value:STRUCT}}, Field{name=after, index=1, schema=Schema{server-c.aeota.accinfo.Value:STRUCT}}, Field{name=source, index=2, schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}}, Field{name=op, index=3, schema=Schema{STRING}}, Field{name=ts_ms, index=4, schema=Schema{INT64}}, Field{name=transaction, index=5, schema=Schema{STRUCT}}]