Это хороший способ отправить двоичные данные загрузки файлов в Kafka, а затем распределить обработку загрузки некоторыми службами, которые подключены к Kafka topi c?
Обычно файлы загружаются в файловую систему, и их URI сохраняются в сообщении Kafka. Это сделано для того, чтобы размер сообщения Kafka был относительно меньше, тем самым увеличивая пропускную способность его клиентов.
В случае, если мы поместим большие объекты в сообщение Kafka, потребитель должен будет читать весь файл. Таким образом, ваш poll()
займет больше времени, чем обычно.
С другой стороны, если мы просто поместим URI файла вместо самого файла, тогда потребление сообщений будет относительно быстрее, и вы сможете делегировать обработка файлов, возможно, в другой поток (возможно, из пула потоков) за счет увеличения пропускной способности вашего приложения. файловая система. Даже кафка хранит сообщения в файловой системе (как файлы сегментов). Таким образом, репликация также может выполняться с самой файловой системой.
Лучший способ - поместить URI, который указывает на файл в сообщении Kafka, а затем поместить обработчик для этого URI, который будет ответственным для предоставления вам файла и, возможно, позаботится о предоставлении вам реплики в случае удаления исходного файла.
Обработчик может быть слабо связан с остальной частью вашей системы, создан специально для управления файлами, поддержки реплик и т. Д. c.
Фильтрация загружаемых данных
фильтрация загруженных данных может быть произведена только тогда, когда вы действительно читаете содержимое файла. Вы можете сделать это, даже поместив URI вашего файла в сообщение и прочитав его оттуда. Например, если вы используете потоки Kafka, вы можете поместить этот лог-фильтр фильтрации c в transform()
или mapValues()
et c.
stream.from(topic)
.mapValues(v -> v.getFileURI())
.filter((k,fileURI) -> validate(read(fileURI)))
.to(..)
Hitting segment.bytes
Еще один недостаток хранения файлов в вашем сообщении состоит в том, что вы можете достичь ограничения segment.bytes
, если файлы больше. Вам нужно постоянно менять segment.bytes
каждый раз, чтобы соответствовать новым требованиям к размеру файлов.
Другой момент, если segment.bytes
установлен на 1 ГБ, а размер вашего первого сообщения (файла) равен 750MB
, а ваше следующее сообщение - 251 MB
, сообщение 251MB
не может поместиться в первом сегменте, поэтому ваш первый сегмент будет содержать только одно сообщение, хотя он не достиг предела. Это означает, что относительно меньшее количество сообщений будет сохранено на сегмент.