Как я могу превратить мой Kafka topi c в ktable самого последнего состояния - PullRequest
0 голосов
/ 17 февраля 2020

У меня есть несколько фиктивных данных о производителях в топи c, которые вводятся с помощью идентификатора сущности:

{"ROWTIME": 1581884260481, "ROWKEY": "key1", "name": "nissan", "id": "key1", "founded": 1910}
{"ROWTIME": 1581884260481, "ROWKEY": "key2", "name": "toyota", "id": "key2", "founded": 0}
{"ROWTIME": 1581894249560, "ROWKEY": "key2", "name": "toyota", "id": "key2", "founded": 1920}

1) Я хотел бы получить ktable, содержащий сокращенное состояние производителей:

{"name": "nissan", "id": "key1", "founded": 1910},
{"name": "toyota", "id": "key2", "founded": 1920}

В: Каков синтаксис запроса в k sql для создания этого сокращенного состояния?

Все документы были написаны до изменений запроса pull / pu sh поэтому я не уверен, как я должен это построить. Хочу ли я получить материализованное представление (для которого требуются агрегаты) или ktable поверх topi c? Нужен ли мне CTaS?

1b) Я думал об этом, и, вероятно, было бы лучше, если бы topi c сохранял только изменения, а не всю запись. В этом сценарии я могу просто написать события, которые в конечном итоге будут уменьшены до части состояния, вместо того, чтобы запрашивать текущую запись или доверять пользовательскому интерфейсу для полной передачи обратно.

2) Когда я запрашиваю это таблица Я хотел бы, чтобы произошли две вещи:

  • Дайте мне список сокращенного состояния (2 записи выше)
  • Если произойдут какие-либо обновления (вставьте или обновите сокращенный список) ) отправить их по мере того, как они происходят

В: Какой запрос я могу создать, чтобы дать текущие и будущие значения в ktable?

emit changes, кажется, только go forward с этого момента времени. Нужно ли мне два запроса, или есть ли способ сделать это в одном?

Спасибо за помощь в заполнении этих пробелов, это приветствуется.

1 Ответ

0 голосов
/ 18 февраля 2020

«rowtime» и «rowkey» - это поля по умолчанию, добавленные «Ksql_server», которые являются полезными столбцами при объединении потоков / таблиц (Windowing и разделение).

  1. 1a) Вы можете пропустить столбцы на потребительском уровне. 1b) не смог понять :(
  2. Ссылка будет полезна (https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)
  3. Вам не нужно вносить какие-либо изменения, потому что если вы создали KTable, добавится только новая запись, которую вы можете проверить с помощью запроса select.

Надеюсь, это поможет.

...