Apache Beam не работает с окнами сеансов в LocalRunner - PullRequest
0 голосов
/ 04 июня 2019

Я хочу попробовать сессионные Windows на Apache Beam и LocalRunner с входными источниками Kafka или SWS.Я не могу заставить их работать должным образом.Шов Windows не обрабатывается. У меня следующий код:

public class Test7Pipeline {

    private static final Logger LOG = LoggerFactory.getLogger(Test7Pipeline.class);

    public static void main(String[] args) throws ParseException {


        PipelineOptions options = PipelineOptionsFactory.create();


        Pipeline p = Pipeline.create(options);

        p.apply(KafkaIO.<Long, String>read()
                .withBootstrapServers("localhost:9092")
                .withTopic("test")
                .withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)

                .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
                //.withMaxNumRecords(5)
                .withoutMetadata())
        .apply(Values.<String>create())

        .apply(
                "WindowIntoSessions",
                Window.<String>into(
                        Sessions.withGapDuration(Duration.standardMinutes(1)))
                .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))

        .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                for (String word : c.element().split("/")) {
                    if (!word.isEmpty()) {
                        c.output(word);
                    }
                }
            }
        }))
        .apply(Count.<String>perElement())
        .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
            @Override
            public String apply(KV<String, Long> input) {
                LOG.info("Value: " + input.getValue());
                return input.getKey() + ": " + input.getValue();
            }
        }))
        .apply("Log", ParDo.of(new FilterTextFn()));
        //.apply(TextIO.write().to("wordcounts"));


        p.run().waitUntilFinish();
    }


То же поведение для Kafka и SQS.

...