Пишу собственный декоратор Flume, но получаю ошибки. Что мне не хватает? - PullRequest
4 голосов
/ 20 октября 2010

Я пишу собственный плагин-декоратор для системы агрегации журналов Cloudera, Flume. Мой код Java ниже:

package multiplex;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;

public class JsonMultiplexDecorator<S extends EventSink> extends EventSinkDecorator<S> {
  private final String serverName;
  private final String logType;

  public JsonMultiplexDecorator(S s, String serverName, String logType) {
    super(s);

    this.serverName = serverName;
    this.logType = logType;
  }

  @Override
  public void append(Event e) throws IOException {
    String body = new String(e.getBody()).replaceAll("\"", "\\\"");

    String json = "{ \"server\": \"" + this.serverName + "\"," +
      "\"log_type\": \"" + this.logType + "\", " +
      "\"body\": \"" + body + "\" }";

    EventImpl e2 = new EventImpl(json.getBytes(),
        e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
        e.getAttrs());

    super.append(e2);
  }

  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context,
          String... argv) {
        Preconditions.checkArgument(argv.length == 2,
            "usage: multiplexDecorator(serverName, logType)");

        return new JsonMultiplexDecorator<EventSink>(null, argv[0], argv[1]);
      }
    };
  }

  public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
    List<Pair<String, SinkDecoBuilder>> builders = 
      new ArrayList<Pair<String, SinkDecoBuilder>>();

    builders.add(new Pair<String, SinkDecoBuilder>("jsonMultiplexDecorator", builder()));

    return builders;
  }
}

Это прекрасно компилируется в файл JAR с помощью ant, я могу загрузить его во Flume во время выполнения и успешно настроить узлы для его использования. Однако, когда событие действительно происходит на узле, на котором загружен этот плагин, я получаю сообщения об ошибках в моем журнале, например:

2010-10-19 21:03:15,176 [logicalNode xxxxx] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because null
java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableMap.put(Collections.java:1285)
    at com.cloudera.flume.core.EventBaseImpl.set(EventBaseImpl.java:65)
    at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:164)
    at com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.append(DiskFailoverDeco.java:93)
    at com.cloudera.flume.core.BackOffFailOverSink.append(BackOffFailOverSink.java:144)
    at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:109)
    at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
    at multiplex.JsonMultiplexDecorator.append(JsonMultiplexDecorator.java:56)
    at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
    at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:69)
    at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)

([logicalNode xxxxx] является заполнителем для внутреннего DNS-имени EC2). У меня нет большого опыта работы с Java, поэтому я не уверен, что я делаю что-то здесь не так или это ошибка Flume. Я должен упомянуть, что я написал это, используя примеры плагинов HelloWorld из источника Flume, а также опираясь на некоторые из встроенных декораторов Flume.

1 Ответ

2 голосов
/ 10 марта 2011

Когда вы создаете EventImpl e2, вы передаете e.getAttrs(), что невозможно изменить.Попробуйте скопировать e.getAttrs() на собственную карту;мелкой копии с использованием new HashMap(e.getAttrs()) должно быть достаточно.

Ссылка: https://groups.google.com/a/cloudera.org/group/flume-user/browse_thread/thread/046b4a446877c8f9?pli=1

...