у нас есть простой механизм дедупликации идентификатора команды, основанный на хранилищах Kafka Streams State Store. Он хранит идентификаторы команд за прошедший час в persistentWindowStore . Мы столкнулись с проблемой с магазином, если позже в этой топологии возникло исключение. Мы запускаем 3 узла, используя docker, каждый с несколькими потоками, установленными для этого конкретного приложения Streams. Процесс выглядит следующим образом:
- сообщение используется на узле 1 Поток-1 (A):
2020-04-16 07:26:19.144 DEBUG 1 --- [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : CommandId: mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a is not a duplicate.
2020-04-16 07:26:19.144 INFO 1 --- [-StreamThread-1] c.g.f.c.s.v.CommandIdValidationUtils : Putting commandId in store: mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a 1587013200000
2020-04-16 07:26:19.153 INFO 1 --- [-StreamThread-1] g.f.l.s.v.XXXXValidationHandler :
Exception in thread "XXXXXXProcessor-53816dfa-6ebf-47ad-864c-0fbcb61dc51c-StreamThread-1" java.lang.RuntimeException: Unexpected exponent value: -5
Failed to process stream task 1_2 due to the following error
Shutting down
Unclean shutdown of all active tasks
Flushing all stores registered in the state manager
Первый узел в топологии анализирует, если это дубликат, если не кладет его в государственный магазин. Следующий узел топологии выдает исключение. В результате транзакция отменяется, смещения не фиксируются. Я дважды проверил список изменений topi c - соответствующие сообщения не зафиксированы.
- задача была перенесена в другой экземпляр, хранилище состояний воссоздано, сообщение используется на узле 2 Thread-2 (B ):
2020-04-16 07:26:43.399 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a is not a duplicate.
2020-04-16 07:26:43.400 INFO 1 --- [-StreamThread-2] c.g.f.c.s.v.CommandIdValidationUtils : Putting commandId in store: mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a 1587013200000
2020-04-16 07:26:43.425 INFO 1 --- [-StreamThread-2] g.f.l.s.v.XXXXValidationHandler :
Exception in thread "XXXXXXProcessor-d069f877-90ff-4ecf-bfbf-3b2bd0e20eac-StreamThread-2" java.lang.RuntimeException: XXXX
Снова та же история, транзакция прервана.
- задача была перенесена на узел 1 Поток-2 снова (C) где Thread-1 уже вышел из строя:
2020-04-16 07:26:55.655 INFO 1 --- [-StreamThread-2] c.g.f.c.s.v.CommandIdValidationUtils : next: KeyValue(1587013200000, mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a)
2020-04-16 07:26:55.655 WARN 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : Command duplicate detected. Command id mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a
Мы проанализировали журналы восстановления задач, и все кажется нормальным, прерванные транзакции были пропущены: Skipping aborted record batch from partition
Однако, скалы db для этой задачи содержит идентификатор команды:
docker cp 134d4f6f82cf:/tmp/kafka-streams/XXXXXXProcessor/1_0/COMMAND_ID_STORE/COMMAND_ID_STORE.1587013200000 .
ldb --db=COMMAND_ID_STORE.1587013200000 scan
mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a : mnl_cmd_1cd3a7c9-56ef-4c08-a312-d9b69681726a
mnl_cmd_c51cbb71-0274-44c6-b016-976add743fe6 : mnl_cmd_c51cbb71-0274-44c6-b016-976add743fe6
Насколько мы понимаем, несмотря на то, что транзакция была прервана на узле 1 (A), запись камней в базе данных не была очищена впоследствии и во время ( C) другой поток извлекал данные из уже существующих пород db (в конце концов, путь к диску связан с задачей, а не с потоком). Мы не уверены, что произойдет с постоянным хранилищем во время восстановления или сбоя задачи. По нашему мнению, эта ситуация нарушает семантику "точно один раз", поскольку хранилище состояний не совпадает с c с топологией журнала изменений c.
Это неправильная конфигурация с нашей стороны / неверные предположения ? Жук? Мы попытались изменить магазин на inMemoryWindowStore , и, похоже, проблема решена. Но мы хотели бы понять это поведение, так как мы сильно зависим от государственных хранилищ.
Мы используем Java 11, клиенты kafka 4.1 и spring-kafka 2.4.5. Мы откатились до клиентов kafka 2.3.1, и проблема остается.
РЕДАКТИРОВАТЬ, мы выполнили еще несколько тестов с изменениями конфигурации и после изменения num.standby.replicas = 1
на num.standby.replicas = 0
проблема исчезла. Из журналов видно, что состояние ожидания создано должным образом, то есть прерванные сообщения не учитываются, но состояние на диске все же повреждено, что довольно легко воспроизвести. Еще один анализ: - Узел A Thread-2 обрабатывает сообщение, вылетает - Узел B Thread-2 является резервной копией, вступает во владение, вылетает - Узел B Thread-1 вступает во владение, обнаруживает дубликат
EDIT 2 подробные журналы, относящиеся к файлам контрольных точек. Сначала отправляется действительная команда, затем недопустимая (следовательно, список изменений topi c не пуст).
NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Checkpointable offsets read from checkpoint: {}
NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null
NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] standby-task [1_2] Checkpointable offsets read from checkpoint: {}
NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
NODE_1 log1:2020-04-15 21:11:33.942 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1] o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] o.a.k.s.p.i.StoreChangelogReader : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for store COMMAND_ID_STORE.
NODE_3 2020-04-15 21:11:52.023 DEBUG 1 --- [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2] o.a.k.s.p.i.AssignedStreamsTasks : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to process stream task 1_2 due to the following error: java.lang.RuntimeException
NODE_3 2020-04-15 21:12:05.346 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task [1_2] Restoring state store COMMAND_ID_STORE from changelog topic XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader : stream-thread [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for store COMMAND_ID_STORE.
NODE_3 2020-04-15 21:12:06.424 WARN 1 --- [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : Command duplicate detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc
Спасибо за помощь.
Код, работающий с COMMAND_ID_STORE (аналогично примеру из Kafka Streams)
Добавление хранилища состояний
WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(storeName,
RETENTION,
WINDOW_SIZE,
true);
StoreBuilder<WindowStore<String, String>> storeBuilder = Stores
.windowStoreBuilder(storeSupplier, Serdes.String(), Serdes.String());
streamsBuilder.addStateStore(storeBuilder);
Выборка
public static boolean isCommandIdDuplicate(String commandId, WindowStore<String, String> commandIdStore) {
long previousHourStart = DateTimeUtils.previousHourStart();
final WindowStoreIterator<String> storeIterator = commandIdStore
.fetch(commandId, previousHourStart, DateTimeUtils.currentTimeMillis());
final boolean hasNext = storeIterator.hasNext();
log.info("hasNext: {}", hasNext);
while (storeIterator.hasNext()) {
log.info("next: {}", storeIterator.next());
}
storeIterator.close();
return hasNext;
}
Установка
final long windowStartTimestamp = DateTimeUtils.thisHourStart();
log.info("Putting commandId in store: {} {}", commandId, windowStartTimestamp);
commandIdStore.put(commandId, commandId, windowStartTimestamp);
Преобразование
.transformValues(ValueTransformerSupplier<YYY,
? extends XXX>) CommandIdValidator::new,
COMMAND_ID_STORE);
Трансформатор
@Slf4j
public class CommandIdValidator<T extends GeneratedMessageV3> implements
ValueTransformer<T, XXX<T>> {
WindowStore<String, String> commandIdStore;
@Override
public void init(ProcessorContext context) {
commandIdStore = (WindowStore<String, String>) context.getStateStore(COMMAND_ID_STORE);
}
@Override
public XXX<T> transform(T command) {
log.debug("Validating commandId.");
String commandId = getCommandId(command);
if (CommandIdValidationUtils.isCommandIdDuplicate(commandId, commandIdStore)) {
log.warn("Command duplicate detected. Command id " + commandId);
return new XXX(command,
XXX.duplicatedCommandId(commandId, command.getClass().getName()));
} else {
log.debug("CommandId: {} is not a duplicate.", commandId);
CommandIdValidationUtils.putCommandIdInStore(commandId, commandIdStore);
return XXX.successful(command);
}
}
@Override
public void close() {
}
}