У меня простая проблема. Допустим, я читаю файл паркета, который выдает объект avro GenericRecord
на строку, как показано ниже.
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j1"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j2"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j3"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j4"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p1"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p2"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p3"}
этот файл был специально сплющен, и я хотел бы отменить его.
- мы знаем, что вход упорядочен, и я бы хотел обработать их до следующего сеансового ключа и перейти к следующему применению в конвейере, чтобы сохранить минимальные требования к памяти, поэтому промежуточный этапдолжен вернуть
KV<String, Iterable<GenericRecord>>
или даже лучше вместе KV<String, GenericRecord>
.
<"john:doe:40", {"name":"john", "surename":"doe", "age":40, ["unique_attribute":"j1", ...]}>
<"paul:carl:28", {"name":"paul", "surename":"carl", "age":28, "user_pk":, ["unique_attribute":"p1", ...]}
это то, что я получил до сих пор;
pipeline.apply("FilePattern", FileIO.match().filepattern(PARQUET_FILE_PATTERN))
.apply("FileReadMatches", FileIO.readMatches())
.apply("ParquetReadFiles", ParquetIO.readFiles(schema))
.apply("SetKeyValuePK", WithKeys.of(input -> AvroSupport.of(input).extractString("user_pk").get())).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
.apply(Window.into(Sessions.withGapDuration(Duration.standardSeconds(5L)))).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
.apply("SetGroupByPK", GroupByKey.create()).setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(AvroCoder.of(schema))))
...
...
Я не знаю, еслиесть лучший способ сделать это, но сейчас я использовал Sessions.withGapDuration
стратегию управления окнами. Я ожидал, что получу сгруппированный элемент KV<String, Iterable<GenericRecord>> element
через каждые ~ 5 секунд, но я ничего не получаю после GroupByKey
, я даже не уверен, что GroupByKey
действительно что-то делает, но я знаю, что память увеличиваетсябыстро, поэтому он должен ждать всех предметов.
Итак, вопрос в том, как бы вы настроили функцию управления окнами, которая позволит мне группировать ключи. Я также пытался Combine.byKey
, как это должно быть GroupByKey + Windowing Function
, но не может быть реализовано?