Используя RedisIO , я пытаюсь запросить коллекцию на сервере Redis.
Сервер Redis работает нормально и отвечает только при пакетном конвейере (без потоковой передачи).
Но, используя потоковые входные данные (из файлов), вот так:
PCollection<String> stream = pipeline.apply("ReadMyFile", TextIO.read().from("/home/out/**")
.watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))
.apply("ParseFn", ParDo.of(new ParseFn()))
.apply("GlobalString", GlobalString.get(Duration.ZERO, Duration.standardSeconds(60)));
И затем примените функцию redisIO read ():
PCollection<KV<String, String>> redis = stream.apply(RedisIO.readAll().withEndpoint("127.0.0.1", 6379));
Наконец, хотите используйте коллекцию результатов, так:
PCollection<String> result = redis.apply("Compose Final Object", ParDo.of(new DoFn<KV<String, String>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element().getKey());
c.output(c.element().getKey());
}
}));
Насколько я тестировал, файлы загружаются и обрабатываются по мере необходимости.