Как записывать данные в режиме реального времени в HDFS с помощью Flume? - PullRequest
0 голосов
/ 16 октября 2018

Я использую Flume для хранения данных датчиков в HDFS.После того, как данные получены через MQTT.Подписчик отправляет данные в формате JSON в прослушиватель Flume HTTP.В настоящее время он работает нормально, но проблема в том, что flume не записывает в файл HDFS, пока я не остановлю его (или размер файла не достигнет 128 МБ).Я использую Hive, чтобы применить схему для чтения.К сожалению, результирующая таблица кустов содержит только 1 запись.Это нормально, потому что Flume не записывает новые поступающие данные в файл (загруженный Hive).

Есть ли способ заставить Flume записывать новые поступающие данные в HDFS практически в реальном времени?Итак, мне не нужно перезапускать его или использовать небольшие файлы?

Вот моя конфигурация Flume:

# Name the components on this agent
emsFlumeAgent.sources = http_emsFlumeAgent
emsFlumeAgent.sinks = hdfs_sink
emsFlumeAgent.channels = channel_hdfs

# Describe/configure the source
emsFlumeAgent.sources.http_emsFlumeAgent.type = http
emsFlumeAgent.sources.http_emsFlumeAgent.bind = localhost
emsFlumeAgent.sources.http_emsFlumeAgent.port = 41414

# Describe the sink
emsFlumeAgent.sinks.hdfs_sink.type = hdfs
emsFlumeAgent.sinks.hdfs_sink.hdfs.path = hdfs://localhost:9000/EMS/%{sensor}
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollInterval = 0
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollSize = 134217728
emsFlumeAgent.sinks.hdfs_sink.hdfs.rollCount=0

#emsFlumeAgent.sinks.hdfs_sink.hdfs.idleTimeout=20
# Use a channel which buffers events in memory
emsFlumeAgent.channels.channel_hdfs.type = memory
emsFlumeAgent.channels.channel_hdfs.capacity = 10000
emsFlumeAgent.channels.channel_hdfs.transactionCapacity = 100

# Bind the source and sinks to the channel
emsFlumeAgent.sources.http_emsFlumeAgent.channels = channel_hdfs 
emsFlumeAgent.sinks.hdfs_sink.channel = channel_hdfs

1 Ответ

0 голосов
/ 16 октября 2018

Я думаю, что сложность заключается в том, что вы хотели бы записывать данные в HDFS практически в реальном времени, но также не хотите маленьких файлов (по очевидным причинам), и это может быть трудной задачей.

Вам необходимо найти оптимальный баланс между следующими двумя параметрами:

hdfs.rollSize (Default = 1024) - размер файла для запуска броска, в байтах (0: никогда не свернуть на основе размера файла)

и

hdfs.batchSize (Default = 100) - число событий, записанных в файл перед его сбросом в HDFS

Если ваши данные вряд ли достигнут 128 МБ в предпочтительное время, то вам может потребоватьсячтобы уменьшить rollSize, но только до такой степени, чтобы вы не столкнулись с проблемой небольших файлов .

Поскольку вы не установили размер пакета в приемнике HDFS, выдолжны видеть результаты сброса HDFS после каждых 100 записей, но как только размер очищенных записей совместно достигнет 128 МБ, содержимое будет свернуто в файл 128 МБ. Разве этого тоже не происходит?Не могли бы вы подтвердить?

Надеюсь, это поможет!

...