Как запросить сервер Redis, используя потоковый вход PCollection в Apache Beam? - PullRequest
0 голосов
/ 14 января 2020

Используя RedisIO , я пытаюсь запросить коллекцию на сервере Redis.

Сервер Redis работает нормально и отвечает только при пакетном конвейере (без потоковой передачи).

Но, используя потоковые входные данные (из файлов), вот так:

  PCollection<String> stream = pipeline.apply("ReadMyFile", TextIO.read().from("/home/out/**")
  .watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))   
  .apply("ParseFn", ParDo.of(new ParseFn()))
  .apply("GlobalString", GlobalString.get(Duration.ZERO, Duration.standardSeconds(60)));

И затем примените функцию redisIO read ():

 PCollection<KV<String, String>> redis = stream.apply(RedisIO.readAll().withEndpoint("127.0.0.1", 6379));

Наконец, хотите используйте коллекцию результатов, так:

 PCollection<String> result = redis.apply("Compose Final Object", ParDo.of(new DoFn<KV<String, String>, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
      System.out.println(c.element().getKey());
      c.output(c.element().getKey());
    }
  }));

Насколько я тестировал, файлы загружаются и обрабатываются по мере необходимости.

1 Ответ

0 голосов
/ 17 января 2020

Для дальнейшего развития это ошибка:

https://github.com/apache/beam/pull/10624/commits

https://issues.apache.org/jira/browse/BEAM-9134

Это будет будет исправлено в ближайшее время.

...