KSQL: добавить несколько дочерних записей в родительскую запись - PullRequest
0 голосов
/ 26 ноября 2018

Я пытаюсь использовать KSQL (как часть confluent-5.0.0) для создания одной записи из набора родительских записей и дочерних записей, где каждая родительская запись имеет несколько дочерних записей (в частности, реквизиты платежа истороны, участвующие в оплате).Эти родительские / дочерние записи связаны идентификатором родителя.Чтобы проиллюстрировать это, я имею в виду записи примерно такой структуры в исходной системе:

payment:
| id    | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD      | 20000  | 2018-11-20   |
| pmt02 | USD      | 13000  | 2018-11-23   |

payment_parties:
| id    | payment_id | party_type   | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01      | sender       | XXYYZZ23    | (null)        |
| prt02 | pmt01      | intermediary | AADDEE98    | 123456789     |
| prt03 | pmt01      | receiver     | FFGGHH56    | 987654321     |
| prt04 | pmt02      | sender       | XXYYZZ23    | (null)        |
| prt05 | pmt02      | intermediary | (null)      | (null)        |
| prt06 | pmt02      | receiver     | FFGGHH56    | 987654321     |

Эти записи загружаются в формате AVRO в набор тем Kafka с использованием Oracle Golden Gate с одной темой.для каждого стола.Это означает, что существуют следующие темы: src_payment и src_payment_parties.В соответствии с тем, как функционирует исходная система, временные метки этих записей находятся в пределах нескольких миллисекунд.

Теперь цель состоит в том, чтобы «свести» эти записи в одну запись, которая будет использоваться из исходящей темы.Чтобы проиллюстрировать вышеприведенные записи, желаемый результат будет выглядеть следующим образом:

payment_flattened:
| id    | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD      | 20000  | 2018-11-20   | XXYYZZ23     | (null)         | AADDEE98           | 123456789            | FFGGHH56       | 987654321        |
| pmt02 | USD      | 13000  | 2018-11-23   | XXYYZZ23     | (null)         | (null)             | (null)               | FFGGHH56       | 987654321        |

Первый вопрос, который я хотел бы задать здесь, следующий: Как мне лучше всего добиться этогоКомбинация данных из исходных тем?

Конечно, я сам попробовал некоторые действия.Для краткости я опишу, чего я пытался добиться, добавив первую из сторон платежа в записи платежей.

Шаг первый: настройка исходных потоков
Примечание: из-за того, что в настройке OGG добавлено свойство, называемое таблицей, в схему AVRO, я должен указать поля, которые нужно взять из темы.Кроме того, меня не интересуют поля, в которых указан тип операции (например, вставка или обновление).

create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');

create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');

Шаг второй: создание потока для отправителей платежей
Примечание: из того, что я собрал из документации и узнал из экспериментов, чтобы иметь возможность присоединить поток платежей к потоку платежной стороны, последний должен быть разделен по идентификатору платежа.Кроме того, единственный способ получить соединение - это переименовать столбец.

create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;

Шаг третий: объединить два потока
Примечание: я использую левыйприсоединиться, потому что не все стороны присутствуют для каждого платежа.Как и в приведенном выше примере записи, где pmt02 не имеет посредника.

create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;

Теперь вывод, который я ожидал бы из этого потока, выглядит примерно так:

ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null

Вместо этого вывод, который я вижу, выглядит следующим образом:

ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null

Следовательно, второй (состоящий из двух частей) вопрос, который я хотел бы задать, это: Почему левое соединение даетэти дубликаты записей?И можно ли этого избежать?

Извиняюсь за стену текста, я постарался быть максимально полным в описании проблемы.Конечно, я был бы рад добавить любую возможную недостающую информацию и ответить на вопросы, касающиеся установки, насколько мне известно.

1 Ответ

0 голосов
/ 26 ноября 2018

Вы почти там: -)

WITHIN 1 SECONDS даст вам результаты, полученные с обеих сторон соединения.

Вместо этого попробуйте WITHIN (0 SECONDS, 1 SECONDS).Тогда только записи с правой стороны объединения будут объединены слева, а не наоборот.

Подробнее об этом шаблоне можно прочитать в статье Я написал здесь .


Кстати, если вы хотите обойти проблему с зарезервированным словом table изOGG, вы можете установить includeTableName на false в конфигурации GG.

...