Beam ElasticseachIO: Как переопределить индексы документов, не основанные на самом документе? - PullRequest
0 голосов
/ 15 января 2020

Я работаю над потоковым конвейером Apache Beam, чтобы создать простой мост от Google PubSub Topi c к ElasticSearch. Документ ElasticSearch будет создан как:

  • Elasti c тело документа> из тела сообщения PubSub (предположим, JSON на основе)
  • Elasti c индекс документа> из PubSub метаданные сообщения "index"
  • Elasti c идентификатор документа> из PubSub метаданные сообщения "id"

соединитель ElasticSearch для Beam (https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html) обеспечивает Функция withIndexFn для указания имени пользовательского индекса. Но с помощью этого метода пользовательский индекс может быть выдан только из самого документа.

Конвейер:

String index = "...";
String docType = "...";
String elkUser = "...";
String elkPwd = "...";

String addresses = "http://lnxfrh099702555.qualif.enterprise.horsprod.lan:9200";
String[] addressesList = Lists.newArrayList(Splitter.on(",").trimResults().split(addresses)).toArray(new String[0]);

ElasticsearchIO.ConnectionConfiguration connection = ElasticsearchIO.ConnectionConfiguration.create(addressesList, index, docType);
connection.withUsername(elkUser);
connection.withPassword(elkPwd);

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

PCollection<PubsubMessage> a = pipeline.apply("Read Messages from PubSub", PubsubIO.readMessagesWithAttributes().fromSubscription("projects/{projectName}/subscriptions/{topicSubscription}"));

PCollection<String> b = a.apply("GenerateElasticDoc", ParDo.of(new GenerateElasticDocFn()));

PDone c = b.apply("Write to Elasticsearch Index",ElasticsearchIO.write()
    .withConnectionConfiguration(connection)
    .withUsePartialUpdate(true));

pipeline.run().waitUntilFinish();

GenerateElasticDocFn:

public class GenerateElasticDocFn extends DoFn<PubsubMessage, String> {
      public GenerateElasticDocFn() {
      }

      @ProcessElement
      public void processElement(ProcessContext context) throws IOException {
        PubsubMessage pubsubMsg = context.element();
        context.output(new String(pubsubMsg.getPayload()).replaceAll("\n", "").replaceAll("\r", ""));
      }
}

Видите ли вы способ установить пользовательское значение для имени индекса, которое не основано непосредственно на документе?

Спасибо! Лайонел.

...