Kafka Wheel Timer - PullRequest
       8

Kafka Wheel Timer

0 голосов
/ 19 февраля 2019

Добрый день,

Я хотел бы выяснить, может ли очередь kafka хранить данные в течение нескольких секунд, а затем освобождать данные.

Я получаю сообщение из темы kafka, После анализаданные, я держу их в памяти некоторое время (10 секунд) (это накапливается при поступлении уникальных сообщений), каждое сообщение имеет собственный таймер), я хочу, чтобы kafka сообщал мне, что срок действия этого сообщения истек (10 секунд)так что я могу продолжить выполнение других задач.

Но поскольку flink / kafka управляется событиями, я надеялся, что у kafka есть какое-то круглое колесо синхронизации, которое может воспроизводить ключ для сообщения через 10 секунд для потребителя.

Есть какие-нибудь идеи о том, как я могу архивировать это с помощью оконного флинка или функций кафки?

С уважением

Ответы [ 2 ]

0 голосов
/ 19 февраля 2019

Вы можете посмотреть ближе к библиотеке Kafka Streams.https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html, https://kafka.apache.org/21/documentation/streams/developer-guide/processor-api.html.

Используя Kafka Streams, вы можете выполнять множество сложных операций по обработке событий.API-интерфейс процессора является API-интерфейсом более низкого уровня и обеспечивает большую гибкость, например, каждое обрабатываемое сообщение помещается в хранилище состояний (абстракция Kafka Streams, которое реплицируется в тему журнала изменений), а затем с помощью Punctuator можно проверить, истекло ли сообщение , истекло .

0 голосов
/ 19 февраля 2019

Относительно вашей первоначальной проблемы:

Я хотел бы узнать, может ли очередь kafka хранить данные в течение нескольких секунд, а затем выпустить данные

Вы можете настроитьlog.cleanup.policy как delete (это значение по умолчанию) и измените retention.ms со значения по умолчанию 604800000 (1 неделя) на 10000.

Не могли бы вы еще раз объяснить, что еще вы хотите проверить, и что вы имели в виду после Regards части?

...