Как создать агрегацию в кафке потребителя с помощью python - PullRequest
0 голосов
/ 31 января 2019

Привет, я написал приложение-потребитель kafka, использующее python с модулем from kafka import KafkaConsumer

Теперь у меня есть поля json, как показано ниже,

{
  "user": "bob",
  "src_ip": "45.6.7.2"
 }

Теперь, как я могу потреблятьсообщения за каждые 5 или 10 минут (настраиваемое время), а затем проверьте, одинаков ли src IP каждого пользователя в течение заданного времени.Если он другой, я должен отправить его, чтобы сохранить в БД, или отправить в другое место с помощью REST POST.

Как этого добиться с помощью приложения для пользователя Python?

1 Ответ

0 голосов
/ 05 апреля 2019

Да, вы можете!Чтобы получить временную метку сообщения, попробуйте msg.timestamp.

Чтобы убедиться, что данные каждого пользователя всегда направляются в один и тот же раздел (и поэтому они всегда будут обрабатываться одним и тем же потребителем), используйте key=data["user"] когда вы создаете сообщения.

Наконец, вам нужно знать, что в течение срока действия приложения-пользователя назначения разделов могут измениться.Так что подумайте, что делать, когда потребитель падает или теряет свое назначение в середине одного из 5 или 10-минутных окон.Имеет ли значение потеря контекста?Если нет, вы, вероятно, можете использовать простое хранилище данных в памяти для каждого потребителя.Если потеря контекста имеет значение, вы можете рассмотреть альтернативные стратегии, используя периодические фиксации смещения вручную или центральное хранилище данных.

...