Я думаю, что мое восприятие Флинка windows может быть неправильным, поскольку они не оцениваются, как я ожидаю из документации или книги Флинка. Цель состоит в том, чтобы присоединиться к Kafka topi c, который имеет довольно статические данные c, с Kafka topi c с постоянно поступающими данными.
env.addSource(createKafkaConsumer())
.join(env.addSource((createKafkaConsumer()))))
.where(keySelector())
.equalTo(keySelector())
.window(TumblingProcessingTimeWindows.of(Time.hours(2)))
.apply(new RichJoinFunction<A, B>() { ... }
createKafkaConsumer()
возвращает FlinkKafkaConsumer
keySelector()
является заполнителем для моего ключевого селектора.
KafkaTopi c A имеет 1 запись, KafkaTopi c B имеет 5. Я понимаю, что JoinFunction запускается 5 раз (условие соединения действует каждый раз), в результате чего получатель получает 5 записей. Если в течение 2 часов поступит новая запись для topi c A, будут созданы еще 5 записей (2x5 записей). Однако то, что происходит в раковине, довольно непредсказуемо, я не мог видеть образец. Иногда нет ничего, иногда исходные записи, но если я отправляю дополнительные сообщения, они не обрабатываются объединением с предыдущими записями.
Мой ключевой вопрос:
Что здесь вообще происходит? Отправляются ли записи после , когда окно завершает обработку? Я ожидал бы вывод в реальном времени в приемник, но это многое объяснило бы.
С этим связано: Могу ли я решить эту проблему с помощью триггера onElement или это сделает мое TimeWindow устаревшим? Существуют ли эти две концепции параллельно друг другу, то есть, что окно соединения составляет 2 часа, но функция соединения + выход запускаются для каждого элемента? Как насчет дубликатов в этом случае?
Впоследствии, означает ли время обработки момент времени, когда запись расходуется из топи c? Так, если я, например, setStartFromEarliest()
при запуске, все сообщения, которые были использованы в течение следующих двух часов, были в этом окне?
Дополнительная информация: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
установлено, и я также переключился на EventTime между ними.