Проверка схемы Apache Avro в Apache Flume - PullRequest
0 голосов
/ 18 февраля 2019

Почитав об Apache Flume и его преимуществах в обработке клиентских событий, я решил, что пришло время заняться этим более подробно.Другое большое преимущество заключается в том, что он может обрабатывать объекты Apache Avro :-) Однако мне трудно понять, как схема Avro используется для проверки полученных событий Flume.

Чтобы помочь понять мою проблему более подробно, япредоставили фрагменты кода ниже;

Схема Avro

В этом сообщении я использую пример схемы, определяющей вложенную запись Object1 с 2 полями.

{
  "namespace": "com.example.avro",
  "name": "Example",
  "type": "record",
  "fields": [
    {
      "name": "object1",
      "type": {
        "name": "Object1",
        "type": "record",
        "fields": [
          {
            "name": "value1",
            "type": "string"
          },
          {
            "name": "value2",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Embedded Flume agent

В моем Java-проекте в настоящее время я использую встроенный агент Apache Flume, как описано ниже;

public static void main(String[] args) {
    final Event event = EventBuilder.withBody("Test", Charset.forName("UTF-8"));

    final Map<String, String> properties = new HashMap<>();
    properties.put("channel.type", "memory");
    properties.put("channel.capacity", "100");
    properties.put("sinks", "sink1");
    properties.put("sink1.type", "avro");
    properties.put("sink1.hostname", "192.168.99.101");
    properties.put("sink1.port", "11111");
    properties.put("sink1.batch-size", "1");
    properties.put("processor.type", "failover");

    final EmbeddedAgent embeddedAgent = new EmbeddedAgent("TestAgent");
    embeddedAgent.configure(properties);
    embeddedAgent.start();

    try {
        embeddedAgent.put(event);
    } catch (EventDeliveryException e) {
        e.printStackTrace();
    }
}

В приведенном выше примере я создаю новое событие Flume с "Тест »определяется как тело события, отправляющее события отдельному агенту Apache Flume, работающему внутри виртуальной машины (192.168.99.101).

Удаленный агент Flume

Как описано выше, я настроил этот агент на получениесобытия от встроенного агента Flume.Конфигурация Flume для этого агента выглядит так:

# Name the components on this agent
hello.sources = avroSource
hello.channels = memoryChannel
hello.sinks = loggerSink

# Describe/configure the source
hello.sources.avroSource.type = avro
hello.sources.avroSource.bind = 0.0.0.0
hello.sources.avroSource.port = 11111
hello.sources.avroSource.channels = memoryChannel

# Describe the sink
hello.sinks.loggerSink.type = logger

# Use a channel which buffers events in memory
hello.channels.memoryChannel.type = memory
hello.channels.memoryChannel.capacity = 1000
hello.channels.memoryChannel.transactionCapacity = 1000

# Bind the source and sink to the channel
hello.sources.avroSource.channels = memoryChannel
hello.sinks.loggerSink.channel = memoryChannel

И я выполняю следующую команду для запуска агента;

./bin/flume-ng agent --conf conf --conf-file ../sample-flume.conf --name hello -Dflume.root.logger=TRACE,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true

Когда я выполняю основной метод проекта Java, я вижуСобытие «Test» передается в мой приемник журнала со следующими выходными данными:

2019-02-18 14:15:09,998 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 54 65 73 74                                     Test }

Однако мне неясно, где именно я должен настроить схему Avro, чтобы гарантировать, что только действительные события будут получены и обработаныпо Flume.Может кто-нибудь, пожалуйста, помогите мне понять, где я иду не так?Или, если я неправильно понял намерение создания Flume для преобразования событий Flume в события Avro?

В дополнение к вышесказанному я также попытался использовать клиент Avro RPC после изменения схемы Avro для указания протокола.разговаривает напрямую с моим удаленным агентом Flume, но когда я пытаюсь отправить события, я вижу следующую ошибку:

Exception in thread "main" org.apache.avro.AvroRuntimeException: Not a remote message: test
    at org.apache.avro.ipc.Requestor$Response.getResponse(Requestor.java:532)
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:359)
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:322)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.messageReceived(NettyTransceiver.java:613)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:595)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Моя цель - убедиться, что события, заполняемые моим приложением, соответствуют схеме Avroгенерируется, чтобы избежать публикации недействительных событий.Я бы предпочел, чтобы я достигал этого с помощью встроенного агента Flume, но если это невозможно, я бы подумал об использовании подхода Avro RPC для непосредственного общения с моим удаленным агентом Flume.

Любая помощь / руководство было бы отличной помощью.,Заранее спасибо.

ОБНОВЛЕНИЕ

После дальнейшего чтения мне интересно, неправильно ли я понял назначение Apache Flume.Первоначально я думал, что это можно использовать для автоматического создания событий Avro на основе данных / схемы, но теперь мне было интересно, должно ли приложение взять на себя ответственность за создание событий Avro, которые будут храниться в Flume в соответствии с конфигурацией канала и отправляться в виде пакета черезсток (в моем случае кластер Spark Streaming).

Если вышеприведенное верно, то я хотел бы знать, требуется ли Flume знать о схеме или только мой кластер Spark Streaming, который в конечном итоге будет обрабатывать эти данные?Если Flume необходимо знать о схеме, то можете ли вы предоставить подробную информацию о том, как этого можно достичь?

Заранее спасибо.

1 Ответ

0 голосов
/ 25 апреля 2019

Поскольку ваша цель - обработка данных с использованием кластера Spark Streaming, вы можете решить эту проблему с помощью двух решений

1), используя клиент Flume (протестирован с flume-ng-sdk 1.9.0) и Spark Streaming (протестировано с spark-streaming_2.11 2.4.0 и spark-streaming-flume_2.11 2.3.0) без сервера Flume между топологией сети.

Клиентский класс отправляет событие Flume json на порт 41416

  public class JSONFlumeClient {
    public static void main(String[] args) {
    RpcClient client = RpcClientFactory.getDefaultInstance("localhost", 41416);
    String jsonData = "{\r\n" + "  \"namespace\": \"com.example.avro\",\r\n" + "  \"name\": \"Example\",\r\n"
            + "  \"type\": \"record\",\r\n" + "  \"fields\": [\r\n" + "    {\r\n"
            + "      \"name\": \"object1\",\r\n" + "      \"type\": {\r\n" + "        \"name\": \"Object1\",\r\n"
            + "        \"type\": \"record\",\r\n" + "        \"fields\": [\r\n" + "          {\r\n"
            + "            \"name\": \"value1\",\r\n" + "            \"type\": \"string\"\r\n" + "          },\r\n"
            + "          {\r\n" + "            \"name\": \"value2\",\r\n" + "            \"type\": \"string\"\r\n"
            + "          }\r\n" + "        ]\r\n" + "      }\r\n" + "    }\r\n" + "  ]\r\n" + "}";
    Event event = EventBuilder.withBody(jsonData, Charset.forName("UTF-8"));
    try {
        client.append(event);
    } catch (Throwable t) {
        System.err.println(t.getMessage());
        t.printStackTrace();
    } finally {
        client.close();
    }
  }
}

Класс Spark Streaming Server прослушивает порт 41416

public class SparkStreamingToySample {
  public static void main(String[] args) throws Exception {
    SparkConf sparkConf = new SparkConf().setMaster("local[2]")
    .setAppName("SparkStreamingToySample");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(30));
    JavaReceiverInputDStream<SparkFlumeEvent> lines = FlumeUtils
    .createStream(ssc, "localhost", 41416);
    lines.map(sfe -> new String(sfe.event().getBody().array(), "UTF-8"))
    .foreachRDD((data,time)->
    System.out.println("***" + new Date(time.milliseconds()) + "=" + data.collect().toString()));
    ssc.start();
    ssc.awaitTermination();
  }
}

2) Использование клиента Flume + сервера Flume между + Spark Streaming (как Flume Sink) в качестве топологии сети.

Дляэта опция, код тот же, но теперь SparkStreaming должен указать полное имя хоста с именем dns вместо localhost, чтобы запустить сервер SparkStreaming с того же порта 41416, если вы запускаете его локально для тестирования.Клиент Flume подключится к порту 41415 сервера flume. Теперь самое сложное - определить топологию flume.Вам нужно указать и источник, и приемник, чтобы это работало.

См. Описание потока ниже

agent1.channels.ch1.type = memory

agent1.sources.avroSource1.channels = ch1
agent1.sources.avroSource1.type = avro
agent1.sources.avroSource1.bind = 0.0.0.0
agent1.sources.avroSource1.port = 41415

agent1.sinks.avroSink.channel = ch1
agent1.sinks.avroSink.type = avro
agent1.sinks.avroSink.hostname = <full dns qualified hostname>
agent1.sinks.avroSink.port = 41416

agent1.channels = ch1
agent1.sources = avroSource1
agent1.sinks = avroSink

Вы должны получить одинаковые результаты с обоими решениями, но возвращаясь к своему вопросу, еслиFlume действительно необходим для потокового содержимого Spark из потока Json, ответ зависит от того, Flume поддерживает перехватчики, поэтому в этом случае его можно использовать для очистки или фильтрации недопустимых данных для вашего проекта Spark, но поскольку вы добавляете дополнительный компонент втопология, которая может влиять на производительность и требовать больше ресурсов (ЦП / память), чем без Flume.

...