У меня есть конвейер, который по существу читает сжатые данные JSON, выполняет небольшую обработку и записывает их через массовый запрос API. Он выполняет sh массовый запрос API, у меня есть DoFn, который буферизует определенный объем данных (скажем, 1500 записей) и сбрасывает пакет, когда он достигает этого порога ( в значительной степени точно так же, как это происходит в блоге Beam). post )
Способ выполнения конвейера в потоке данных заключается в том, что он читает ВСЕ данные JSON (почти на 1 ТБ) перед переходом к этапу пакетной обработки. Есть идеи, что здесь происходит? Промежуточный шаг «Добавить случайный ключ», по существу, принимает строку JSON и выводит KV (random.nextInt(), jsonString)
, которое было добавлено для предотвращения слияния в секцию вызовов пакета + API
Редактировать: добавление фрагмента кода для раздела «Буфер»:
new DoFn<KV<String, String>, KV<String, String>>() {
private static final int MAX_BUFFER_SIZE = 500;
private static final Duration MAX_BUFFER_DURATION = Duration.standardSeconds(1);
@TimerId("stale")
private final TimerSpec staleSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId("buffer")
private final StateSpec<BagState<String>> bufferedEvents = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<KV<String, String>> bufferState,
@TimerId("stale") Timer staleTimer,
@StateId("count") ValueState<Integer> countState) {
if (firstNonNull(countState.read(), 0) == 0) {
staleTimer.offset(MAX_BUFFER_DURATION).setRelative();
}
int count = firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
bufferState.add(context.element().value());
if (count >= MAX_BUFFER_SIZE) {
for (KV<String, String> enrichedEvent : enrichEvents(bufferState.read())) {
context.output(KV.of(RandomStringUtils.randomNumeric(1), enrichedEvent));
}
bufferState.clear();
countState.clear();
}
}
@OnTimer("stale")
public void onStale(
OnTimerContext context,
@StateId("buffer") BagState<Event> bufferState,
@StateId("count") ValueState<Integer> countState) {
if (!bufferState.isEmpty().read()) {
for (String enrichedEvent : enrichEvents(bufferState.read())) {
context.output(KV.of(RandomStringUtils.randomNumeric(1), enrichedEvent));
}
bufferState.clear();
countState.clear();
}
}
}