Государственный магазин Кафка в разных масштабах - PullRequest
0 голосов
/ 07 января 2020

У меня есть 5 разных машин с каждым масштабированным 5 экземпляром весенней загрузки, который использует приложение kafka-streams. Я использую 50 разделов, сжатых topi c с различными 2-3 темами, и у каждого моего экземпляра есть 10 параллелей. Я использую docker рой и docker громкость. Используя эти разделы, KTable или KStream выполняют некоторые операции flatMap, map и join с моим приложением kafka streams.

    props.put(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);
    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
    props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
    props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
    props.put("num.stream.threads", 10);
    props.put("application.id", applicationId);

Если все идет хорошо, в моем приложении нет ошибок или нет потери данных с помощью операций .join (), но когда один из моих экземпляров не работает, мои операции соединения не могут выполнить соединение.

Мой вопрос: когда приложение перезапускается или повторно развертывается (и с учетом того, что оно работает внутри непостоянного контейнера), его состояние меняется? Чем мои операции соединения не работают. Когда я повторно развертываю свой экземпляр и заполняю сжатую топику c из эластичного поиска последними объектами, мои операции соединения в порядке. Поэтому я думаю, что когда мое приложение запускается на новой машине, мое локальное хранилище состояний исчезло? Но документ kafka гласит:

Если задачи выполняются на отказавшем компьютере и перезапускаются на другом компьютере, Kafka Streams гарантирует восстановление связанных хранилищ состояний до содержимого до сбоя, воспроизводя соответствующие разделы журнала изменений до возобновление обработки вновь запущенных задач. В результате обработка ошибок полностью прозрачна для конечного пользователя. Обратите внимание, что стоимость инициализации (повторной) задачи обычно зависит в основном от времени восстановления состояния путем воспроизведения связанных разделов журнала изменений хранилищ состояний. Чтобы свести к минимуму это время восстановления, пользователи могут настроить свои приложения на наличие резервных копий локальных состояний (т. Е. Полностью реплицированных копий состояния). Когда происходит миграция задачи, Kafka Streams затем пытается назначить задачу экземпляру приложения, где такая резервная реплика уже существует, чтобы минимизировать затраты на инициализацию задачи. См. Num.standby.replicas в разделе «Настройка потоков Kafka». (https://kafka.apache.org/0102/documentation/streams/architecture)

Мой сбитый экземпляр обновляет sh kafka state-store, когда он поднимается? Если по этой причине я теряю данные и понятия не имею: / Или я не могу перезагрузить хранилище состояний из-за commit_offset, потому что все мои экземпляры используют один и тот же идентификатор приложения?

Спасибо!

1 Ответ

0 голосов
/ 07 января 2020

Темы журнала изменений всегда читаются с самого раннего смещения, и они уплотняются, поэтому они не теряют данные.

Если вы присоединяетесь к некомпактным темам, то, конечно, вы теряете данные, но это не ограничивается Kafka Streams или вашим конкретным c сценарием использования ... Вам нужно настроить topi c так, чтобы данные сохранялись, по крайней мере, столько, сколько вы думаете, что вам понадобится решить любые проблемы с topi c время простоя. Пока данные сохраняются, вы всегда можете найти к ним своего потребителя

Если вы хотите иметь постоянное хранилище, используйте монтирование тома в свой контейнер, например, через Kubernetes, или подключите хранилище состояний, сохраненное снаружи, к контейнер типа Redis: https://github.com/andreas-schroeder/redisks

...