Почитав об Apache Flume и его преимуществах в обработке клиентских событий, я решил, что пришло время заняться этим более подробно.Другое большое преимущество заключается в том, что он может обрабатывать объекты Apache Avro :-) Однако мне трудно понять, как схема Avro используется для проверки полученных событий Flume.
Чтобы помочь понять мою проблему более подробно, япредоставили фрагменты кода ниже;
Схема Avro
В этом сообщении я использую пример схемы, определяющей вложенную запись Object1
с 2 полями.
{
"namespace": "com.example.avro",
"name": "Example",
"type": "record",
"fields": [
{
"name": "object1",
"type": {
"name": "Object1",
"type": "record",
"fields": [
{
"name": "value1",
"type": "string"
},
{
"name": "value2",
"type": "string"
}
]
}
}
]
}
Embedded Flume agent
В моем Java-проекте в настоящее время я использую встроенный агент Apache Flume, как описано ниже;
public static void main(String[] args) {
final Event event = EventBuilder.withBody("Test", Charset.forName("UTF-8"));
final Map<String, String> properties = new HashMap<>();
properties.put("channel.type", "memory");
properties.put("channel.capacity", "100");
properties.put("sinks", "sink1");
properties.put("sink1.type", "avro");
properties.put("sink1.hostname", "192.168.99.101");
properties.put("sink1.port", "11111");
properties.put("sink1.batch-size", "1");
properties.put("processor.type", "failover");
final EmbeddedAgent embeddedAgent = new EmbeddedAgent("TestAgent");
embeddedAgent.configure(properties);
embeddedAgent.start();
try {
embeddedAgent.put(event);
} catch (EventDeliveryException e) {
e.printStackTrace();
}
}
В приведенном выше примере я создаю новое событие Flume с "Тест »определяется как тело события, отправляющее события отдельному агенту Apache Flume, работающему внутри виртуальной машины (192.168.99.101).
Удаленный агент Flume
Как описано выше, я настроил этот агент на получениесобытия от встроенного агента Flume.Конфигурация Flume для этого агента выглядит так:
# Name the components on this agent
hello.sources = avroSource
hello.channels = memoryChannel
hello.sinks = loggerSink
# Describe/configure the source
hello.sources.avroSource.type = avro
hello.sources.avroSource.bind = 0.0.0.0
hello.sources.avroSource.port = 11111
hello.sources.avroSource.channels = memoryChannel
# Describe the sink
hello.sinks.loggerSink.type = logger
# Use a channel which buffers events in memory
hello.channels.memoryChannel.type = memory
hello.channels.memoryChannel.capacity = 1000
hello.channels.memoryChannel.transactionCapacity = 1000
# Bind the source and sink to the channel
hello.sources.avroSource.channels = memoryChannel
hello.sinks.loggerSink.channel = memoryChannel
И я выполняю следующую команду для запуска агента;
./bin/flume-ng agent --conf conf --conf-file ../sample-flume.conf --name hello -Dflume.root.logger=TRACE,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
Когда я выполняю основной метод проекта Java, я вижуСобытие «Test» передается в мой приемник журнала со следующими выходными данными:
2019-02-18 14:15:09,998 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 54 65 73 74 Test }
Однако мне неясно, где именно я должен настроить схему Avro, чтобы гарантировать, что только действительные события будут получены и обработаныпо Flume.Может кто-нибудь, пожалуйста, помогите мне понять, где я иду не так?Или, если я неправильно понял намерение создания Flume для преобразования событий Flume в события Avro?
В дополнение к вышесказанному я также попытался использовать клиент Avro RPC после изменения схемы Avro для указания протокола.разговаривает напрямую с моим удаленным агентом Flume, но когда я пытаюсь отправить события, я вижу следующую ошибку:
Exception in thread "main" org.apache.avro.AvroRuntimeException: Not a remote message: test
at org.apache.avro.ipc.Requestor$Response.getResponse(Requestor.java:532)
at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:359)
at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:322)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.messageReceived(NettyTransceiver.java:613)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:595)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Моя цель - убедиться, что события, заполняемые моим приложением, соответствуют схеме Avroгенерируется, чтобы избежать публикации недействительных событий.Я бы предпочел, чтобы я достигал этого с помощью встроенного агента Flume, но если это невозможно, я бы подумал об использовании подхода Avro RPC для непосредственного общения с моим удаленным агентом Flume.
Любая помощь / руководство было бы отличной помощью.,Заранее спасибо.
ОБНОВЛЕНИЕ
После дальнейшего чтения мне интересно, неправильно ли я понял назначение Apache Flume.Первоначально я думал, что это можно использовать для автоматического создания событий Avro на основе данных / схемы, но теперь мне было интересно, должно ли приложение взять на себя ответственность за создание событий Avro, которые будут храниться в Flume в соответствии с конфигурацией канала и отправляться в виде пакета черезсток (в моем случае кластер Spark Streaming).
Если вышеприведенное верно, то я хотел бы знать, требуется ли Flume знать о схеме или только мой кластер Spark Streaming, который в конечном итоге будет обрабатывать эти данные?Если Flume необходимо знать о схеме, то можете ли вы предоставить подробную информацию о том, как этого можно достичь?
Заранее спасибо.