Недопустимое содержимое хранилища состояний после прерванной транзакции с точными репликами и резервными репликами после переноса задачи - PullRequest
3 голосов
/ 16 апреля 2020

у нас есть простой механизм дедупликации идентификатора команды, основанный на хранилищах Kafka Streams State Store. Он хранит идентификаторы команд за прошедший час в persistentWindowStore . Мы столкнулись с проблемой с магазином, если позже в этой топологии возникло исключение. Мы запускаем 3 узла, используя docker, каждый с несколькими потоками, установленными для этого конкретного приложения Streams. Процесс выглядит следующим образом:

  • сообщение используется на узле 1 Поток-1 (A):
2020-04-16 07:26:19.144 DEBUG 1 --- [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator  : CommandId: mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a is not a duplicate.
2020-04-16 07:26:19.144  INFO 1 --- [-StreamThread-1] c.g.f.c.s.v.CommandIdValidationUtils     : Putting commandId in store: mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a 1587013200000
2020-04-16 07:26:19.153  INFO 1 --- [-StreamThread-1] g.f.l.s.v.XXXXValidationHandler : 
Exception in thread "XXXXXXProcessor-53816dfa-6ebf-47ad-864c-0fbcb61dc51c-StreamThread-1" java.lang.RuntimeException: Unexpected exponent value: -5
Failed to process stream task 1_2 due to the following error
Shutting down
Unclean shutdown of all active tasks
Flushing all stores registered in the state manager

Первый узел в топологии анализирует, если это дубликат, если не кладет его в государственный магазин. Следующий узел топологии выдает исключение. В результате транзакция отменяется, смещения не фиксируются. Я дважды проверил список изменений topi c - соответствующие сообщения не зафиксированы.

  • задача была перенесена в другой экземпляр, хранилище состояний воссоздано, сообщение используется на узле 2 Thread-2 (B ):
2020-04-16 07:26:43.399 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator  : CommandId: mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a is not a duplicate.
2020-04-16 07:26:43.400  INFO 1 --- [-StreamThread-2] c.g.f.c.s.v.CommandIdValidationUtils     : Putting commandId in store: mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a 1587013200000
2020-04-16 07:26:43.425  INFO 1 --- [-StreamThread-2] g.f.l.s.v.XXXXValidationHandler : 
Exception in thread "XXXXXXProcessor-d069f877-90ff-4ecf-bfbf-3b2bd0e20eac-StreamThread-2" java.lang.RuntimeException: XXXX

Снова та же история, транзакция прервана.

  • задача была перенесена на узел 1 Поток-2 снова (C) где Thread-1 уже вышел из строя:
2020-04-16 07:26:55.655  INFO 1 --- [-StreamThread-2] c.g.f.c.s.v.CommandIdValidationUtils     : next: KeyValue(1587013200000, mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a)
2020-04-16 07:26:55.655  WARN 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator  : Command duplicate detected. Command id mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a

Мы проанализировали журналы восстановления задач, и все кажется нормальным, прерванные транзакции были пропущены: Skipping aborted record batch from partition

Однако, скалы db для этой задачи содержит идентификатор команды:

docker cp 134d4f6f82cf:/tmp/kafka-streams/XXXXXXProcessor/1_0/COMMAND_ID_STORE/COMMAND_ID_STORE.1587013200000 .
ldb --db=COMMAND_ID_STORE.1587013200000 scan
mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a : mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a
mnl_cmd_c51cbb71-0274-44c6-b016-976add743fe6 : mnl_cmd_c51cbb71-0274-44c6-b016-976add743fe6

Насколько мы понимаем, несмотря на то, что транзакция была прервана на узле 1 (A), запись камней в базе данных не была очищена впоследствии и во время ( C) другой поток извлекал данные из уже существующих пород db (в конце концов, путь к диску связан с задачей, а не с потоком). Мы не уверены, что произойдет с постоянным хранилищем во время восстановления или сбоя задачи. По нашему мнению, эта ситуация нарушает семантику "точно один раз", поскольку хранилище состояний не совпадает с c с топологией журнала изменений c.

Это неправильная конфигурация с нашей стороны / неверные предположения ? Жук? Мы попытались изменить магазин на inMemoryWindowStore , и, похоже, проблема решена. Но мы хотели бы понять это поведение, так как мы сильно зависим от государственных хранилищ.

Мы используем Java 11, клиенты kafka 4.1 и spring-kafka 2.4.5. Мы откатились до клиентов kafka 2.3.1, и проблема остается.

РЕДАКТИРОВАТЬ, мы выполнили еще несколько тестов с изменениями конфигурации и после изменения num.standby.replicas = 1 на num.standby.replicas = 0 проблема исчезла. Из журналов видно, что состояние ожидания создано должным образом, то есть прерванные сообщения не учитываются, но состояние на диске все же повреждено, что довольно легко воспроизвести. Еще один анализ: - Узел A Thread-2 обрабатывает сообщение, вылетает - Узел B Thread-2 является резервной копией, вступает во владение, вылетает - Узел B Thread-1 вступает во владение, обнаруживает дубликат

EDIT 2 подробные журналы, относящиеся к файлам контрольных точек. Сначала отправляется действительная команда, затем недопустимая (следовательно, список изменений topi c не пуст).

NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager        : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Checkpointable offsets read from checkpoint: {}
NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager        : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null
NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager        : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Checkpointable offsets read from checkpoint: {}
NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] o.a.k.s.s.internals.OffsetCheckpoint     : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] o.a.k.s.s.internals.OffsetCheckpoint     : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint     : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint     : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint

NODE_1 log1:2020-04-15 21:11:33.942 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator  : CommandId: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.

NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint     : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint     : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager        : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.StoreChangelogReader         : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for store COMMAND_ID_STORE.

NODE_3 2020-04-15 21:11:52.023 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator  : CommandId: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2] o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to process stream task 1_2 due to the following error: java.lang.RuntimeException

NODE_3 2020-04-15 21:12:05.346 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager        : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader         : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for store COMMAND_ID_STORE.

NODE_3 2020-04-15 21:12:06.424  WARN 1 --- [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator  : Command duplicate detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc

Спасибо за помощь.

Код, работающий с COMMAND_ID_STORE (аналогично примеру из Kafka Streams)

Добавление хранилища состояний

    WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(storeName,
        RETENTION,
        WINDOW_SIZE,
        true);
    StoreBuilder<WindowStore<String, String>> storeBuilder = Stores
        .windowStoreBuilder(storeSupplier, Serdes.String(), Serdes.String());
    streamsBuilder.addStateStore(storeBuilder);

Выборка

public static boolean isCommandIdDuplicate(String commandId, WindowStore<String, String> commandIdStore) {
    long previousHourStart = DateTimeUtils.previousHourStart();

    final WindowStoreIterator<String> storeIterator = commandIdStore
        .fetch(commandId, previousHourStart, DateTimeUtils.currentTimeMillis());
    final boolean hasNext = storeIterator.hasNext();
    log.info("hasNext: {}", hasNext);
    while (storeIterator.hasNext()) {
      log.info("next: {}", storeIterator.next());
    }
    storeIterator.close();
    return hasNext;

  }

Установка

    final long windowStartTimestamp = DateTimeUtils.thisHourStart();
    log.info("Putting commandId in store: {} {}", commandId, windowStartTimestamp);
    commandIdStore.put(commandId, commandId, windowStartTimestamp);

Преобразование

.transformValues(ValueTransformerSupplier<YYY,
                    ? extends XXX>) CommandIdValidator::new,
                COMMAND_ID_STORE);

Трансформатор

@Slf4j
public class CommandIdValidator<T extends GeneratedMessageV3> implements
    ValueTransformer<T, XXX<T>> {

  WindowStore<String, String> commandIdStore;

  @Override
  public void init(ProcessorContext context) {
    commandIdStore = (WindowStore<String, String>) context.getStateStore(COMMAND_ID_STORE);
  }

  @Override
  public XXX<T> transform(T command) {

    log.debug("Validating commandId.");

    String commandId = getCommandId(command);

    if (CommandIdValidationUtils.isCommandIdDuplicate(commandId, commandIdStore)) {
      log.warn("Command duplicate detected. Command id " + commandId);
      return new XXX(command,
          XXX.duplicatedCommandId(commandId, command.getClass().getName()));
    } else {
      log.debug("CommandId: {} is not a duplicate.", commandId);
      CommandIdValidationUtils.putCommandIdInStore(commandId, commandIdStore);
      return XXX.successful(command);
    }
  }

  @Override
  public void close() {

  }

}

1 Ответ

0 голосов
/ 19 апреля 2020

То, что вы описываете, безусловно, звучит как ошибка в Kafka Streams.

Мы откатились до клиентов kafka 2.3.1, и проблема сохраняется.

С какой версии ? 2.4.0? 2.4.1 и 2.5.0 были выпущены недавно, возможно, вы можете попробовать их, чтобы увидеть, если проблема все еще существует? (Некоторые ошибки EOS были исправлены; я не смог найти тикет, который бы соответствовал описанной вами проблеме ... Поэтому не уверен, что ошибка все еще существует или нет.)

Как EOS должен работать:

Для локальных экземпляров RocksDB Kafka Streams также записывает файл «контрольных точек» на локальный диск с некоторыми метаданными. Если файл контрольных точек существует, мы знаем, что RocksDB является «чистым», то есть синхронизирован c с журналом изменений topi c. Если файл контрольных точек не существует, мы знаем, что состояние повреждено и весь экземпляр RocksDB удаляется при запуске, а состояние перестраивается из журнала изменений. Следовательно, файл контрольных точек должен быть записан только во время чистого завершения задачи (т. Е. Перебалансировки) и должен быть удален после назначения задачи и до начала обработки. Во время обработки не должно быть файла контрольных точек, и поэтому, если мы взломали sh, мы знаем, что хранилище повреждено, и мы перестраиваем его из журнала изменений.

Одна теория, которая может быть ошибкой, заключается в следующем: когда первый поток падает (в экземпляре A), файл контрольных точек не записывается, что правильно. После первого перебалансирования StandbyTask назначается экземпляру A - этот StandbyTask должен уничтожить локальное хранилище состояний (так как нет файла контрольных точек), и ошибка заключается в том, что эта очистка не происходит (возможно, вы можете проверить это с помощью журналы; стирание должно быть зарегистрировано, или, возможно, некоторые метки времени «создать файл» в справке файловой системы). Когда происходит сбой второго потока в экземпляре B, запускается другой баланс. StandbyTask неправильно считает, что он имеет чистое состояние и записывает файл контрольных точек. Когда активная задача запускается в другом потоке в экземпляре A, она просто принимает поврежденное состояние, поскольку (неверно) существующий файл контрольных точек указывает на то, что состояние является согласованным.

  • При переключении в -память памяти, проблема исчезла изначально, так как на диске нет локального состояния и файлов контрольных точек, поэтому состояние всегда перестраивается из списка изменений topi c.
  • При отключении StandbyTasks во втором rebalance, файл контрольных точек не записывается, и поэтому, когда активная задача запускается во втором потоке в экземпляре A, она теперь корректно стирает хранилище и перестраивает из журнала изменений.

Может быть, вы можете попытаться проверить это, а также, может быть, открыть отчет об ошибке?

...