Я пытаюсь использовать KSQL (как часть confluent-5.0.0) для создания одной записи из набора родительских записей и дочерних записей, где каждая родительская запись имеет несколько дочерних записей (в частности, реквизиты платежа истороны, участвующие в оплате).Эти родительские / дочерние записи связаны идентификатором родителя.Чтобы проиллюстрировать это, я имею в виду записи примерно такой структуры в исходной системе:
payment:
| id | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 |
| pmt02 | USD | 13000 | 2018-11-23 |
payment_parties:
| id | payment_id | party_type | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01 | sender | XXYYZZ23 | (null) |
| prt02 | pmt01 | intermediary | AADDEE98 | 123456789 |
| prt03 | pmt01 | receiver | FFGGHH56 | 987654321 |
| prt04 | pmt02 | sender | XXYYZZ23 | (null) |
| prt05 | pmt02 | intermediary | (null) | (null) |
| prt06 | pmt02 | receiver | FFGGHH56 | 987654321 |
Эти записи загружаются в формате AVRO в набор тем Kafka с использованием Oracle Golden Gate с одной темой.для каждого стола.Это означает, что существуют следующие темы: src_payment
и src_payment_parties
.В соответствии с тем, как функционирует исходная система, временные метки этих записей находятся в пределах нескольких миллисекунд.
Теперь цель состоит в том, чтобы «свести» эти записи в одну запись, которая будет использоваться из исходящей темы.Чтобы проиллюстрировать вышеприведенные записи, желаемый результат будет выглядеть следующим образом:
payment_flattened:
| id | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | (null) | AADDEE98 | 123456789 | FFGGHH56 | 987654321 |
| pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | (null) | (null) | (null) | FFGGHH56 | 987654321 |
Первый вопрос, который я хотел бы задать здесь, следующий: Как мне лучше всего добиться этогоКомбинация данных из исходных тем?
Конечно, я сам попробовал некоторые действия.Для краткости я опишу, чего я пытался добиться, добавив первую из сторон платежа в записи платежей.
Шаг первый: настройка исходных потоков
Примечание: из-за того, что в настройке OGG добавлено свойство, называемое таблицей, в схему AVRO, я должен указать поля, которые нужно взять из темы.Кроме того, меня не интересуют поля, в которых указан тип операции (например, вставка или обновление).
create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');
create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');
Шаг второй: создание потока для отправителей платежей
Примечание: из того, что я собрал из документации и узнал из экспериментов, чтобы иметь возможность присоединить поток платежей к потоку платежной стороны, последний должен быть разделен по идентификатору платежа.Кроме того, единственный способ получить соединение - это переименовать столбец.
create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;
Шаг третий: объединить два потока
Примечание: я использую левыйприсоединиться, потому что не все стороны присутствуют для каждого платежа.Как и в приведенном выше примере записи, где pmt02
не имеет посредника.
create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;
Теперь вывод, который я ожидал бы из этого потока, выглядит примерно так:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
Вместо этого вывод, который я вижу, выглядит следующим образом:
ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null
Следовательно, второй (состоящий из двух частей) вопрос, который я хотел бы задать, это: Почему левое соединение даетэти дубликаты записей?И можно ли этого избежать?
Извиняюсь за стену текста, я постарался быть максимально полным в описании проблемы.Конечно, я был бы рад добавить любую возможную недостающую информацию и ответить на вопросы, касающиеся установки, насколько мне известно.