Я бы создал приемник с состоянием, который бы удерживал передаваемые сообщения. Когда счет становится достаточно высоким (1000), приемник отправляет пакет. Состояние может находиться в памяти (например, переменная экземпляра, содержащая ArrayList сообщений), но вы должны использовать контрольные точки, чтобы вы могли восстановить это состояние в случае какого-либо сбоя.
Когда ваш приемник имеет контрольную точкусостояние, он должен реализовать функцию CheckpointedFunction (в org.apache.flink.streaming.api.checkpoint), что означает, что вам нужно добавить два метода в ваш приемник:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
// HttpSinkStateItem is a user-written class
// that just holds a collection of messages (Strings, in this case)
//
// Buffer is declared as ArrayList<String>
checkpointedState.add(new HttpSinkStateItem(buffer));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Mix and match different kinds of states as needed:
// - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
// - types are list and union
// - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
// - types are value, list, reducing, aggregating and map
// - Distinguish between state data using state name (e.g. "HttpSink-State")
ListStateDescriptor<HttpSinkStateItem> descriptor =
new ListStateDescriptor<>(
"HttpSink-State",
HttpSinkStateItem.class);
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (HttpSinkStateItem item: checkpointedState.get()) {
buffer = new ArrayList<>(item.getPending());
}
}
}
Вы также можете использовать таймер вприемник (если входной поток имеет ключ / раздел) для периодической отправки, если количество не достигает вашего порога.