чтение только конкретных сообщений из темы кафки - PullRequest
0 голосов
/ 18 февраля 2019

Сценарий:

Я записываю данные объекта JSON данных в тему kafka во время чтения. Я хочу прочитать только определенный набор сообщений на основе значения, присутствующего в сообщении.Я использую библиотеку kafka-python.

примеры сообщений:

{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}

Здесь я хочу читать только те сообщения, у которых flow_Status завершен.

Ответы [ 3 ]

0 голосов
/ 18 февраля 2019

В Кафке невозможно что-то подобное сделать.Потребитель потребляет сообщения одно за другим, одно за другим, начиная с самого последнего принятого смещения (или с начала, или просматривая с определенным смещением).Зависит от вашего варианта использования, возможно, у вас может быть другой поток в вашем сценарии: сообщение с процессом, выполняемым для выполнения, переходит в тему, но затем приложение, которое обрабатывает действие, затем записывает результат (выполненный или неудачный) в двух разных темах.: таким образом у вас все выполнено отделено от неудачного.Другим способом является использование приложения Kafka Streams для выполнения фильтрации, но с учетом того, что это всего лишь сахар, в действительности приложение потоков всегда будет читать все сообщения, но позволяет легко фильтровать сообщения.

0 голосов
/ 18 февраля 2019

Потребитель Kafka не поддерживает такую ​​функциональность заранее.Вы должны будете использовать все события последовательно, отфильтровать статус завершенных событий и поместить его куда-нибудь.Вместо этого вы можете рассмотреть возможность использования приложения Kafka Streams, где вы можете прочитать данные в виде потока и отфильтровать события, где flow_status = "complete", и опубликовать их в какой-либо теме вывода или в другом месте назначения.

Пример:

KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));

PS Kafka не имеет официального выпуска Python API для KStream, но существует проект с открытым исходным кодом: https://github.com/wintoncode/winton-kafka-streams

0 голосов
/ 18 февраля 2019

Вы можете создать две разные темы;один для выполненного и другой для статуса отказа.А затем прочитайте сообщения из завершенных тем, чтобы обработать их.

В противном случае, если вы хотите, чтобы они были в одной теме и хотите читать только завершенные, я считаю, что вам нужно прочитать их все и игнорировать ошибкуиспользующие простое условие if-else.

...