Я работаю над потоковым конвейером Apache Beam, чтобы создать простой мост от Google PubSub Topi c к ElasticSearch. Документ ElasticSearch будет создан как:
- Elasti c тело документа> из тела сообщения PubSub (предположим, JSON на основе)
- Elasti c индекс документа> из PubSub метаданные сообщения "index"
- Elasti c идентификатор документа> из PubSub метаданные сообщения "id"
соединитель ElasticSearch для Beam (https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.html) обеспечивает Функция withIndexFn для указания имени пользовательского индекса. Но с помощью этого метода пользовательский индекс может быть выдан только из самого документа.
Конвейер:
String index = "...";
String docType = "...";
String elkUser = "...";
String elkPwd = "...";
String addresses = "http://lnxfrh099702555.qualif.enterprise.horsprod.lan:9200";
String[] addressesList = Lists.newArrayList(Splitter.on(",").trimResults().split(addresses)).toArray(new String[0]);
ElasticsearchIO.ConnectionConfiguration connection = ElasticsearchIO.ConnectionConfiguration.create(addressesList, index, docType);
connection.withUsername(elkUser);
connection.withPassword(elkPwd);
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
PCollection<PubsubMessage> a = pipeline.apply("Read Messages from PubSub", PubsubIO.readMessagesWithAttributes().fromSubscription("projects/{projectName}/subscriptions/{topicSubscription}"));
PCollection<String> b = a.apply("GenerateElasticDoc", ParDo.of(new GenerateElasticDocFn()));
PDone c = b.apply("Write to Elasticsearch Index",ElasticsearchIO.write()
.withConnectionConfiguration(connection)
.withUsePartialUpdate(true));
pipeline.run().waitUntilFinish();
GenerateElasticDocFn:
public class GenerateElasticDocFn extends DoFn<PubsubMessage, String> {
public GenerateElasticDocFn() {
}
@ProcessElement
public void processElement(ProcessContext context) throws IOException {
PubsubMessage pubsubMsg = context.element();
context.output(new String(pubsubMsg.getPayload()).replaceAll("\n", "").replaceAll("\r", ""));
}
}
Видите ли вы способ установить пользовательское значение для имени индекса, которое не основано непосредственно на документе?
Спасибо! Лайонел.