KafkaStream stateStore устаревшие данные после перебалансировки - PullRequest
0 голосов
/ 15 января 2020

Мы используем потоки Кафки для обработки входящих GPS-координат. Требуется следующее: Если от определенного водителя не получено ни одной позиции в течение x минут, пометьте этого водителя как недоступного. С каждой новой позицией мы заполняем хранилище состояний с DriverId в качестве ключа и фактической позиции в качестве значения. Все идет нормально. Затем мы хотим представить планировщик для проверки хранилища состояний и пометить каждый драйвер с позицией старше x минут как недоступную.

Мы используем Processor API для создания нашей топологии.

StoreBuilder currentTrackabilityStateStore = Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("driver-trackability-store"),
                    Serdes.String(),
                    driverTrackabilityStateSerde());

Topology topology = new Topology()
            .addSource(partitionedDriverGpsSourceName,
                    Serdes.String().deserializer(),
                    Serdes.ByteArray().deserializer(),
                    MessageTopic.DRIVER_GPS_INTERNAL.getValue())
            .addProcessor(processorNameRequestId,
                    ProcessorRequestId::new,
                    partitionedDriverGpsSourceName)
            .addProcessor(deserializeProcessorName,
                    ProcessorDeserialize::new,
                    processorNameRequestId)
            .addProcessor(trackabilityProcessorName,
                    ()-> new ProcessorTrackability(trackabilityChangesSinkName),
                    deserializeProcessorName)
            .addSink(trackabilityChangesSinkName,
                    MessageTopic.TRACKABILITY.getValue(),
                    Serdes.String().serializer(),
                    driverTrackabilityStateSerde().serializer(),
                    trackabilityProcessorName)
            .addStateStore(currentTrackabilityStateStore, trackabilityProcessorName);

Планировщик инициализации

    public void init(ProcessorContext context) {
        this.context = context;
        this.kvStore = (KeyValueStore<String, GpsInfo>) context.getStateStore("driver-trackability-store");
        schedule = this.context.schedule(Duration.of(45, ChronoUnit.SECONDS), PunctuationType.WALL_CLOCK_TIME, new GpsTrackabilityPunctuator(this.kvStore, this.context, trackabilityChangesSink));
}

Метод обработки

   public void process(String key, GpsInfo gpsInfo) {
       // omitted
       this.kvStore.put(key, gpsInfo);
      // omitted
   }

Наконец, есть пунктуатор

 public void punctuate(long timestamp) {
    log.info("Punctuating...");
    KeyValueIterator<String, GpsInfo> iterator = this.kvStore.all(); // Problem is right here
}

Проблема в этом .kvStore.all (), который, очевидно, сохраняет устаревшую информацию после перебалансировки. Например, драйвер с идентификатором 10 будет назначен разделу, который обрабатывается в экземпляре 1, а хранилище будет заполнено y записями, затем произойдет перебалансировка, и драйвер 10 теперь обрабатывается в экземпляре 2. Состояние успешно перенесен на экземпляр 2, поэтому, когда расписание запускается на экземпляре 2, у нас будут все предыдущие позиции, а также новые, которые поступают. Проблема заключается в том, что планировщик все еще работает в экземпляре 1. Что происходит, если новые позиции не сохраняются, и планировщик помечает драйвер 10 как недоступный, в то время как планировщик в экземпляре 2 помечает тот же драйвер как доступный, поскольку он имеет последнюю позицию , Так есть ли способ удалить устаревшие записи из государственного магазина после восстановления баланса? Я что-то упустил?

РЕДАКТИРОВАТЬ 1:

Резервное копирование хранилища состояний при запуске из списка изменений topi c. Это делается для каждой задачи. Каждая задача получает все сообщения из журнала изменений (независимо от разделов). Вот почему kvStore.all () возвращает все записи, а не только те, которые обрабатываются текущей задачей.

1 Ответ

0 голосов
/ 18 января 2020

Это была ошибка с моей стороны. После многих часов отладки я пришел к выводу, что проблема связана с тем, что у topi c есть записи с ключом и без ключа.

Это как-то портит механизм восстановления баланса магазина.

Решением было удалить исходный файл topi c и создать новый, содержащий только записи с ключами.

...