Попытка пакетной записи с помощью API процессора потоков Кафки. Дозирование основано на размерах и времени. Допустим, если размер пакета достигает 10, или последний пакет обрабатывается более 10 секунд назад (размер или время последней обработки, что когда-либо раньше), затем вызовите внешний API для отправки пакета и фиксации с использованием ProcessingContext.
Использование punctuate
для периодической проверки возможности очистки партии и ее отправки во внешнюю систему.
Вопрос - Может ли метод API process
процессора вызываться API потоков при выполнении точечного потока? Поскольку код вызывает коммит в потоке пунктуации, могут ли context.commit()
коммитить записи, которые еще не обработаны методом процесса?
Возможно ли, что пунктуальный поток и метод процесса выполняются одновременно в разных потоках? Если так, то в коде у меня есть записи коммитов, которые еще не обработаны
public class TestProcessor extends AbstractProcessor<String, String> {
private ProcessorContext context;
private List<String> batchList = new LinkedList<>();
private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
@Override
public void init(ProcessorContext context) {
LOG.info("Calling init method " + context.taskId());
this.context = context;
context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
10000){
//call external API
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
});
}
@Override
public void process(String key, String value) {
batchList.add(value);
LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
"storeSize " + batchList.size());
if(batchList.size() == 10){
//call external API to send the batch
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
}
@Override
public void close() {
if(batchList.size() > 0){
//call external API to send the left over records
batchList.clear();
}
}
}