Debezium - операция обновления выдает событие изменения в Kafka Topi c со значениями до и после структуры, но игнорирует нулевой столбец / поле в до структуры - PullRequest
1 голос
/ 01 мая 2020

Я использую debezium для синхронизации данных между двумя postgres серверами БД, и у меня возникла проблема с событием / операцией обновления , так как он записывает событие изменения в kafka topi c игнорируя столбец / поле с нулевым значением (см. ниже infodetcode поле, отсутствующее в структуре before, поскольку оно было пустым в БД), тот же столбец / поле доступен после структуры в качестве значения изменено с «ноль» на «некоторое значение», при этом столбец с нулевым значением отсутствует в структуре «до», когда я сравниваю «до» с «после» структуры, чтобы выяснить, какие значения полей / столбцов являются уникальными / дублирующими для создания запроса dynamici c, запрос строится с отсутствующим столбцом, и необходимо поместить этот столбец в запрос (пожалуйста, найдите приведенную ниже конфигурацию и реализацию сравнения структуры до и после, которая возвращает результат без столбца с нулевым значением), я с удовольствием приму предложения / помощь по этому вопросу.

Примечание: для параметра REPLICA IDENTITY установлено значение "FULL"

Версия: PostgreSQL - 10,9, дебезий - 1.1.1. Финал

До и после Struct-Topi c Запись (фактическая) :

    before=struct{accountno=01,currencycode=USD,seqno=1,informationcode=S}
    after=struct{accountno=01,currencycode=USD,seqno=1   ,informationcode=M  ,infodetcode=N}

До и после Struct-Topi c Запись (ожидается) :

 before=struct{accountno=01,currencycode=USD,seqno=1,informationcode=S,infodetcode=null}
            after=struct{accountno=01,currencycode=USD,seqno=1   ,informationcode=M  ,infodetcode=N}

Конфигурация дебезия:

@Bean
public io.debezium.config.Configuration postgreConnectorConfiguration() {
    return io.debezium.config.Configuration.create()
            .with("name", "postgres-connector")
            .with("snapshot.mode", SnapshotMode.CUSTOM)
            .with("snapshot.custom.class", "postgresql.snapshot.CustomSnapshotter")
            .with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
            .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
            .with("database.history.file.filename", "/debezium/dbhistory.dat")
            .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
            .with("offset.storage.file.filename", "/debezium/offset/postgre-offset.dat")
            .with("offset.flush.interval.ms", 60000)
            .with("snapshot.isolation.mode", "read_committed")
            .with("key.converter.schemas.enable",true)
            .with("value.converter.schemas.enable", true)

            .with("plugin.name", "pgoutput")
            .with("slot.name", "debeziumtest")
            .with("database.server.name", "server-c")
            .with("database.hostname", databaseHost)
            .with("database.port", databasePort)
            .with("database.user", databaseUserName)
            .with("database.password", databasePassword)
            .with("database.dbname", databaseName)
            .with("table.whitelist", TABLES_TO_MONITOR).build();
}

Сравнение структуры (до и после):

private void handleEvent(SourceRecord sourceRecord) {
    Struct sourceRecordEntry = (Struct) sourceRecord.value();

    if (sourceRecordEntry != null) {

        Struct sourceStruct = (Struct) sourceRecordEntry.get(FieldName.SOURCE);

        String tableName =  sourceStruct.getString(TABLE);
        Date transactionDate = new Date(System.currentTimeMillis());
        Long transctionTime = (Long) sourceStruct.get(FieldName.TIMESTAMP);
        Time txnTime = new Time(transctionTime);
        Long transactionCode = (Long) sourceStruct.get(TRANSACTION_ID);
        Operation operation = Operation.forCode(sourceRecordEntry.getString(OPERATION));

    if (operation == Operation.UPDATE) {

            Map<String, Object> beforeEntryHash;
            Map<String, Object> afterEntryHash;

            List preFieldList = new ArrayList();
            List preValueList = new ArrayList();
            List postFieldList = new ArrayList();
            List postValueList = new ArrayList();
            Integer preFieldcount = 0, preValuecount = 0, postFieldcount = 0, postValuecount = 0;

            Struct beforeStruct = (Struct) sourceRecordEntry.get(BEFORE);
            Struct afterStruct = (Struct) sourceRecordEntry.get(AFTER);

            beforeEntryHash = beforeStruct.schema().fields().stream().map(Field::name).filter(fieldName->beforeStruct.get(fieldName)!=null).map(fieldName-> Pair.of(fieldName, beforeStruct.get(fieldName))).collect(toMap(Pair::getKey,Pair::getValue));
            afterEntryHash = afterStruct.schema().fields().stream().map(Field::name).filter(fieldName->afterStruct.get(fieldName)!=null).map(fieldName-> Pair.of(fieldName, afterStruct.get(fieldName))).collect(toMap(Pair::getKey,Pair::getValue));

            MapDifference<String, Object> rowDifferenceHash  = Maps.difference(beforeEntryHash, afterEntryHash);

            for(Entry<String, ValueDifference<Object>> rowEntry : rowDifferenceHash.entriesDiffering().entrySet()) {
                preFieldList.add(PR_PREFIX + rowEntry.getKey());
                postFieldList.add(PO_PREFIX + rowEntry.getKey());
                preValueList.add(SQ + rowEntry.getValue().leftValue() + SQ);
                postValueList.add(SQ + rowEntry.getValue().rightValue() + SQ);
                LOGGER.info("Key : "  + rowEntry.getKey() + " Left Value : " + rowEntry.getValue().leftValue() + " Right Value : " + rowEntry.getValue().rightValue());

              }
        }
    }
}

Сообщение :

SourceRecord{sourcePartition={server=server-c}, sourceOffset={transaction_id=null, lsn_proc=4921004793408, lsn=4921004793408, txId=81939856, ts_usec=1588212060567019}} ConnectRecord{topic='server-c.a.accinfo', kafkaPartition=null, key=Struct{accountno=01           ,currencycode=USD,seqno=1   }, keySchema=Schema{server-c.a.accinfo.Key:STRUCT}, value=Struct{before=Struct{accountno=01           ,currencycode=USD,seqno=1   ,informationcode=S  },after=Struct{accountno=01           ,currencycode=USD,seqno=1   ,informationcode=P  ,infodetcode=I},source=Struct{version=1.2.0.Alpha1,connector=postgresql,name=server-c,ts_ms=1588212060567,db=OTATEMP,schema=a,table=accinfo,txId=81939856,lsn=4921004793408},op=u,ts_ms=1588213782961}, valueSchema=Schema{server-c.a.accinfo.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}

Схема:

[Field{name=before, index=0, schema=Schema{server-c.aeota.accinfo.Value:STRUCT}}, Field{name=after, index=1, schema=Schema{server-c.aeota.accinfo.Value:STRUCT}}, Field{name=source, index=2, schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}}, Field{name=op, index=3, schema=Schema{STRING}}, Field{name=ts_ms, index=4, schema=Schema{INT64}}, Field{name=transaction, index=5, schema=Schema{STRUCT}}]
...