Закачивать файлы в Кафку и дальше обрабатывать? - PullRequest
2 голосов
/ 13 июля 2020

Это хороший способ отправить двоичные данные загрузки файлов в Kafka, а затем распределить обработку загрузки некоторыми службами, подключенными к Kafka topi c?

Я вижу некоторые преимущества:

  • Фильтрация данных загрузки
  • Реплика
  • Некоторые службы могут обрабатывать загрузку, а не только одну

Что вы думаете об этом?

1 Ответ

4 голосов
/ 13 июля 2020

Это хороший способ отправить двоичные данные загрузки файлов в Kafka, а затем распределить обработку загрузки некоторыми службами, которые подключены к Kafka topi c?

Обычно файлы загружаются в файловую систему, и их URI сохраняются в сообщении Kafka. Это сделано для того, чтобы размер сообщения Kafka был относительно меньше, тем самым увеличивая пропускную способность его клиентов.

В случае, если мы поместим большие объекты в сообщение Kafka, потребитель должен будет читать весь файл. Таким образом, ваш poll() займет больше времени, чем обычно.

С другой стороны, если мы просто поместим URI файла вместо самого файла, тогда потребление сообщений будет относительно быстрее, и вы сможете делегировать обработка файлов, возможно, в другой поток (возможно, из пула потоков) за счет увеличения пропускной способности вашего приложения. файловая система. Даже кафка хранит сообщения в файловой системе (как файлы сегментов). Таким образом, репликация также может выполняться с самой файловой системой.

Лучший способ - поместить URI, который указывает на файл в сообщении Kafka, а затем поместить обработчик для этого URI, который будет ответственным для предоставления вам файла и, возможно, позаботится о предоставлении вам реплики в случае удаления исходного файла.

Обработчик может быть слабо связан с остальной частью вашей системы, создан специально для управления файлами, поддержки реплик и т. Д. c.

Фильтрация загружаемых данных

фильтрация загруженных данных может быть произведена только тогда, когда вы действительно читаете содержимое файла. Вы можете сделать это, даже поместив URI вашего файла в сообщение и прочитав его оттуда. Например, если вы используете потоки Kafka, вы можете поместить этот лог-фильтр фильтрации c в transform() или mapValues() et c.

stream.from(topic)
.mapValues(v -> v.getFileURI())
.filter((k,fileURI) -> validate(read(fileURI)))
.to(..)

Hitting segment.bytes

Еще один недостаток хранения файлов в вашем сообщении состоит в том, что вы можете достичь ограничения segment.bytes, если файлы больше. Вам нужно постоянно менять segment.bytes каждый раз, чтобы соответствовать новым требованиям к размеру файлов.

Другой момент, если segment.bytes установлен на 1 ГБ, а размер вашего первого сообщения (файла) равен 750MB, а ваше следующее сообщение - 251 MB, сообщение 251MB не может поместиться в первом сегменте, поэтому ваш первый сегмент будет содержать только одно сообщение, хотя он не достиг предела. Это означает, что относительно меньшее количество сообщений будет сохранено на сегмент.

...