Кафка удаляет записи из темы не используя смещения, а по полю записи - PullRequest
0 голосов
/ 17 мая 2019

Предположим, у меня есть тема под названием "пакетная обработка" с 1 разделом, и я публикую в ней миллионы записей для обработки. У меня есть группа потребителей из 3 человек для обработки этих миллионов записей. Я сталкиваюсь со случаем, когда мне больше не нужно обрабатывать определенное подмножество сообщений, которые удовлетворяют определенным критериям, таким как age < 50

Как мне удалить эти сообщения из темы программным способом. Как я нажимаю кнопку «Отмена» в пользовательском интерфейсе, и он должен удалить те подмножества записей из темы, чьи age < 50, чтобы они не обрабатывались потребителями.

Я знаю, что могу удалить сообщения, запустив командную строку со смещением: - https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

А также Java API, но опять же по смещению:

https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/admin/AdminClient.html#deleteRecords-java.util.Map-org.apache.kafka.clients.admin.DeleteRecordsOptions-

Delete records whose offset is smaller than the given offset of the corresponding partition

Но в моем случае я не могу использовать смещения, потому что мне нужно только удалить определенные записи, а не all records smaller than the given offset

Ответы [ 2 ]

3 голосов
/ 17 мая 2019

Главное, на что мне нужно обратить внимание, это то, что вы не должны рассматривать данные в Kafka так же, как данные в базе данных. Kafka не был разработан для такой работы (например, когда я нажимаю кнопку X, записи Y будут удалены).

Вместо этого вы должны видеть тему как поток бесконечных данных. Каждая запись, созданная по теме Kafka, будет потреблена и обработана независимо потребителем.

Восприятие темы в виде потока дает другое решение:

Вы можете использовать вторую тему с отфильтрованными результатами!

Streaming Diagram
                            ___ Topic A ____
--  Produced Messages -->  |                |      _______________________
                           |________________| --> |                       |
                                                  | Filtering Application |
                            ___  Topic B ___      |                       |
                           |                | <-- |_______________________|
<-- Consumed Messages --   |________________|

Объяснение довольно простое, вы создали сообщения для темы А. Затем вы используете Filtering Application, который будет:

  1. Используйте ваши сообщения из темы A
  2. На основе некоторой бизнес-логики (например, age < 50) будет выполняться фильтрация
  3. Создание отфильтрованных сообщений в теме B

Наконец, ваши потребители получат сообщения из темы B.

Теперь, когда дело доходит до создания приложения фильтрации, у вас есть несколько вариантов:

  1. Реализация базового решения с использованием потребителя и производителя
  2. Использование Потоки Кафки
  3. Использование KSQL
2 голосов
/ 17 мая 2019

Вы не можете, Kafka не предназначен для использования в качестве базы данных, это фактически неизменный журнал коммитов. Инструмент удаления записей используется в основном для административных задач.

Исключение составляет, если вы используете сжатие журнала . Если у вас есть сжатая тема, вы можете удалить значение для ключа, опубликовав запись в теме со значением NULL. Сжатые темы обычно используются как журналы фиксации базы данных, и вы читаете их в какой-то нисходящий сервис, где он материализуется как таблица. Значение NULL должно преобразоваться в удаление записи.

Таким образом, в вашем случае использования вы материализуете свою тему в систему, оптимизированную для запроса, подобного SELECT key FROM TABLE WHERE age > 50;, и публикуете записи для каждого ключа со значением NULL обратно в тему Kafka. Вы могли бы даже начать своего потребителя в начале темы и отметить, какие записи имеют age > 50 и делают то же самое, но это не будет столь эффективно.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...