Окно не оценивает элементы из Кафки Source - PullRequest
0 голосов
/ 17 марта 2020

Я думаю, что мое восприятие Флинка windows может быть неправильным, поскольку они не оцениваются, как я ожидаю из документации или книги Флинка. Цель состоит в том, чтобы присоединиться к Kafka topi c, который имеет довольно статические данные c, с Kafka topi c с постоянно поступающими данными.

env.addSource(createKafkaConsumer())
        .join(env.addSource((createKafkaConsumer()))))
        .where(keySelector())
        .equalTo(keySelector())
        .window(TumblingProcessingTimeWindows.of(Time.hours(2)))
        .apply(new RichJoinFunction<A, B>() { ... }

createKafkaConsumer() возвращает FlinkKafkaConsumer

keySelector() является заполнителем для моего ключевого селектора.

KafkaTopi c A имеет 1 запись, KafkaTopi c B имеет 5. Я понимаю, что JoinFunction запускается 5 раз (условие соединения действует каждый раз), в результате чего получатель получает 5 записей. Если в течение 2 часов поступит новая запись для topi c A, будут созданы еще 5 записей (2x5 записей). Однако то, что происходит в раковине, довольно непредсказуемо, я не мог видеть образец. Иногда нет ничего, иногда исходные записи, но если я отправляю дополнительные сообщения, они не обрабатываются объединением с предыдущими записями.

Мой ключевой вопрос:

Что здесь вообще происходит? Отправляются ли записи после , когда окно завершает обработку? Я ожидал бы вывод в реальном времени в приемник, но это многое объяснило бы.

С этим связано: Могу ли я решить эту проблему с помощью триггера onElement или это сделает мое TimeWindow устаревшим? Существуют ли эти две концепции параллельно друг другу, то есть, что окно соединения составляет 2 часа, но функция соединения + выход запускаются для каждого элемента? Как насчет дубликатов в этом случае?

Впоследствии, означает ли время обработки момент времени, когда запись расходуется из топи c? Так, если я, например, setStartFromEarliest() при запуске, все сообщения, которые были использованы в течение следующих двух часов, были в этом окне?

Дополнительная информация: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); установлено, и я также переключился на EventTime между ними.

1 Ответ

1 голос
/ 17 марта 2020

Семантика временного окна обработки при падении заключается в том, что оно обрабатывает все события, попадающие в данный промежуток времени. В вашем случае это 2 часа. По умолчанию окно будет выводить результаты только по истечении 2 часов, потому что нужно знать, что для этого окна не будет других событий.

Если вы хотите вывести ранние результаты (например, для каждой входящей записи) ), тогда вы можете указать пользовательский Trigger, который срабатывает для каждого элемента. См. документацию по Trigger API для получения дополнительной информации об этом.

Обновление

Время окна не начинается с первого элемента, но окно начинается с кратных длины окна , Например, если ваш размер окна составляет 2 часа, то вы можете иметь только windows [0, 2), [2, 4), ... но не [1, 3), [3, 5).

...