API-интерфейс процессора Kafka Streams по размеру и времени - PullRequest
0 голосов
/ 07 января 2019

Попытка пакетной записи с помощью API процессора потоков Кафки. Дозирование основано на размерах и времени. Допустим, если размер пакета достигает 10, или последний пакет обрабатывается более 10 секунд назад (размер или время последней обработки, что когда-либо раньше), затем вызовите внешний API для отправки пакета и фиксации с использованием ProcessingContext.

Использование punctuate для периодической проверки возможности очистки партии и ее отправки во внешнюю систему.

Вопрос - Может ли метод API process процессора вызываться API потоков при выполнении точечного потока? Поскольку код вызывает коммит в потоке пунктуации, могут ли context.commit() коммитить записи, которые еще не обработаны методом процесса?

Возможно ли, что пунктуальный поток и метод процесса выполняются одновременно в разных потоках? Если так, то в коде у меня есть записи коммитов, которые еще не обработаны

public class TestProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;
    private List<String> batchList = new LinkedList<>();
    private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());

    private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);

    @Override
    public void init(ProcessorContext context) {
        LOG.info("Calling init method " + context.taskId());
        this.context = context;
        context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
                    10000){
                //call external API
                batchList.clear();
                lastProcessedTime.set(System.currentTimeMillis());
            }
            context.commit();
        });

    }

    @Override
    public void process(String key, String value) {
        batchList.add(value);
        LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
                "storeSize " + batchList.size());
        if(batchList.size() == 10){
            //call external API to send the batch
            batchList.clear();
            lastProcessedTime.set(System.currentTimeMillis());
        }
        context.commit();
    }

    @Override
    public void close() {
        if(batchList.size() > 0){
            //call external API to send the left over records
            batchList.clear();
        }
    }

}

1 Ответ

0 голосов
/ 07 января 2019

Может ли метод API процессора process вызываться API потоков при поток punctuate выполняется?

Нет, это невозможно, поскольку Processor выполняет process и punctuate методы в одном потоке (один и тот же поток используется для обоих методов).

Возможно ли, что прерывистый поток и метод процесса выполняется одновременно в разных темах?

Ответ: «Это невозможно», описание приведено выше.

Примите во внимание, что у каждого раздела темы будет собственный экземпляр вашего класса TestProcessor. вместо локальных переменных batchList и lastProcessedTime я рекомендую использовать хранилище состояний Kafka, например KeyValueStore, поэтому ваш поток будет отказоустойчивым.

...