KStream to KTable Inner Join создает разное количество записей каждый раз, когда обрабатывается с одинаковыми данными - PullRequest
0 голосов
/ 08 марта 2020

Я хочу сделать KStream to KTable Join. используя KTable как справочную таблицу. На следующих шагах показана последовательность выполнения кода

  1. Construct KTable

  2. ReKey KTable

  3. Построить KStream

  4. ReKey KStream

  5. Присоединиться к KStream - KTable

Допустим, есть 8000 записей в KStream - 14 записей в KTable и предполагается, что для каждого ключа в KStreams есть запись в KTable. Таким образом, ожидаемый результат будет 8000 записей.

Каждый раз, когда я делаю объединение в первый раз или когда я запускаю приложение. Ожидаемый результат - 8000 записей, но иногда я вижу только 6200 записей, иногда полный набор записей 8000 (дважды), иногда нет записей и т. Д. c.

  • Вопрос 1: почему Есть ли несоответствия в записях каждый раз, когда я запускаю приложение?

    До создания KTable (конструкция + Rekey), KStreams создается, и данные доступны для соединения со стороны KStream, затем соединение начинается без KTable, поэтому нет данные будут видны в окончательном соединении, пока не будет создан KTable после того, как однажды создан KTable, мы можем увидеть объединение для остальных записей.

  • Вопрос 2: Как устранить несоответствие в записях?

    Я пытался с помощью Test случай использования Embedded Kafka для соединения KStream и Ktable. Было 10 записей из KStreams и 3 записи из KTable, которые использовались процессом. когда я запускал тестовый пример в первый раз, не было никакого соединения, и я не видел никаких данных после соединения. Когда побежал тот же второй раз, он побежал отлично. Если я очищу государственный магазин, то вернусь к нулю.

  • Вопрос 3: Почему происходит такое поведение?

    Я попытался с K SQL, и соединение работало отлично, и я получил 8000 записей, затем я вошел в K SQL исходный код, я заметил, что K SQL также выполняет ту же функцию Join.

  • Вопрос 4. Как K SQL решает проблему?

Я видел несколько примеров предлагаемых ответов

Я использую потоки весенних облаков в качестве зависимости.

Также я увидел открытую проблему, касающуюся этого где-то на JIRA.

1 Ответ

0 голосов
/ 10 марта 2020

ниже шагов показывает последовательность, в которой выполняется код

Обратите внимание, что построение топологии просто обеспечивает логическое описание программы потока данных, и нет "порядка выполнения" другой оператор. Программа будет переведена и все операторы будут выполнены одновременно. Следовательно, данные по всем темам будут считываться параллельно.

Эта параллельная обработка является причиной root вашего наблюдения, т. Е. Таблица не загружается первой до начала обработки (в по крайней мере, гарантия по умолчанию отсутствует) и, следовательно, данные на стороне потока могут быть обработаны, даже если таблица загружена не полностью.

Порядок обработки между различными топиками c зависит от меток времени записи: записи с меньшие временные метки обрабатываются первыми. Следовательно, если вы хотите убедиться, что данные KTable обрабатываются первыми, вы должны убедиться, что метки времени записи меньше, чем метки времени записи на стороне потока. Это может быть обеспечено либо при вводе входных данных во ввод topi c, либо с помощью пользовательского экстрактора меток времени.

Во-вторых, выборка данных из тем не определена c и, таким образом, если возвращаются данные только для стороны потока (но не для данных на стороне таблицы), сравнение временных меток не может быть выполнено, и, таким образом, данные на стороне потока будут обрабатываться перед данными на стороне таблицы. Чтобы решить эту проблему, вы можете увеличить параметр конфигурации max.task.idle.ms (по умолчанию 0ms). Если вы увеличите этот конфиг (и я считаю, что K SQL также делает это по умолчанию), если нет данных для одного ввода, задача заблокирует и попытается извлечь данные для пустого ввода (только после истечения времени простоя, обработка будет продолжаться, даже если одна сторона пуста).

Для GlobalKTable поведение отличается. Эта таблица будет загружена при запуске до начала какой-либо обработки. Следовательно, я не уверен, почему это не сработало для вас.

...