Apache Beam :: не может заставить groupbykey работать с окном сеанса с Java - PullRequest
0 голосов
/ 23 октября 2019

У меня простая проблема. Допустим, я читаю файл паркета, который выдает объект avro GenericRecord на строку, как показано ниже.

{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j1"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j2"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j3"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j4"}

{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p1"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p2"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p3"}

этот файл был специально сплющен, и я хотел бы отменить его.

  • мы знаем, что вход упорядочен, и я бы хотел обработать их до следующего сеансового ключа и перейти к следующему применению в конвейере, чтобы сохранить минимальные требования к памяти, поэтому промежуточный этапдолжен вернуть KV<String, Iterable<GenericRecord>> или даже лучше вместе KV<String, GenericRecord>.
<"john:doe:40", {"name":"john", "surename":"doe", "age":40, ["unique_attribute":"j1", ...]}>
<"paul:carl:28", {"name":"paul", "surename":"carl", "age":28, "user_pk":, ["unique_attribute":"p1", ...]}

это то, что я получил до сих пор;

        pipeline.apply("FilePattern", FileIO.match().filepattern(PARQUET_FILE_PATTERN))
                .apply("FileReadMatches", FileIO.readMatches())
                .apply("ParquetReadFiles", ParquetIO.readFiles(schema))
                .apply("SetKeyValuePK", WithKeys.of(input -> AvroSupport.of(input).extractString("user_pk").get())).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
                .apply(Window.into(Sessions.withGapDuration(Duration.standardSeconds(5L)))).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
                .apply("SetGroupByPK", GroupByKey.create()).setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(AvroCoder.of(schema))))
...
...

Я не знаю, еслиесть лучший способ сделать это, но сейчас я использовал Sessions.withGapDuration стратегию управления окнами. Я ожидал, что получу сгруппированный элемент KV<String, Iterable<GenericRecord>> element через каждые ~ 5 секунд, но я ничего не получаю после GroupByKey, я даже не уверен, что GroupByKey действительно что-то делает, но я знаю, что память увеличиваетсябыстро, поэтому он должен ждать всех предметов.

Итак, вопрос в том, как бы вы настроили функцию управления окнами, которая позволит мне группировать ключи. Я также пытался Combine.byKey, как это должно быть GroupByKey + Windowing Function, но не может быть реализовано?

1 Ответ

0 голосов
/ 24 октября 2019

Мне удалось заставить группу работать, но я не уверен, полностью ли понимаю. Я должен был добавить две мысли. Первая (любая?) Операция ввода-вывода в Beam не добавляет метку времени.

.apply("WithTimestamp", WithTimestamps.of(input -> Instant.now()))

секунду Я добавил Triger, чтобы GroupByKey действительно срабатывал. Понятия не имею, почему это не сработало. Я уверен, что у кого-то есть объяснение этому.

.apply("SessionWindow", Window.<KV<String, GenericRecord>>into(Sessions.withGapDuration(Duration.standardSeconds(5L))).triggering(
                        AfterWatermark.pastEndOfWindow()
                                .withLateFirings(AfterProcessingTime
                                        .pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
                        .withAllowedLateness(Duration.ZERO)
                        .discardingFiredPanes())

Это не идеально, все же пришлось подождать пару минут, прежде чем я увижу, что GroupByKey срабатывает, даже если окно только 5s, но это срабатывает в конце концов, а это прогресс.

РЕДАКТИРОВАТЬ: хорошо, похоже, временная метка не нужна, я предполагаю, потому что окно основано на сеансе, а не на времени. Я также изменил настройку на потоковую передачу

        options.as(StreamingOptions.class)
                .setStreaming(true);

Надеюсь, это поможет кому-то, у кого возникают похожие проблемы.

...