Я использую оба Kafka и Kafka Streams в моем приложении Spring.В то время как обычная Kafka связь, такая как отправка / получение в / из тем работает нормально, Kafka Streams перестает потреблять (обрабатывать записи) вскоре после запуска приложения.Странно то, что при перезапуске приложения обработка записей возобновляется на короткое время, и даже некоторые обработанные данные отправляются в тему, но вскоре после того, как потоки Кафки снова застряли.
IЯ почти уверен, что мне чего-то не хватает, возможно, потоки Kafka настроены неправильно или, возможно, я потребляю неправильно.
Конфигурация брокера: У меня есть кластер из 3 брокеров, в общем, я использую конфигурацию брокера Kafka по умолчанию, которая поставляется с двоичными файлами, за исключением того, что я увеличил количество разделов вв тему 3 .
Только те конфигурации, которые я изменил (также из-за размера кластера, как рекомендовано):
num.partitions=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
Версии и ОС:
Я использую Kafka Streams v2.1.0 , Клиенты Kafka v2.1.0 , Брокеры Kafka v2.1.0 и Spring Kafka 2.2.3.RELEASE .
Брокеры и потребители, работающие на Debian GNU / Linux 9.8 (растяжение) .
Конфигурация Java Kafka Streams:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, STREAMS_ID);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 15);
props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
У меня есть 15 потоковых потоков для оптимального параллелизма с использованием следующей логики:
# num_of_partitions * # num_of_stream_topics
Примерчто делает потребитель Kafka Streams:
KStream<String, ActivityLog> kStream = kStreamBuilder.stream(ServerConstants.KAFKA.ACTIVITY_LOGS_DESTINATION_TOPIC, Consumed.with(Serdes.String(), getActivityLogSerde()));
TriggerSensitivities triggerSensitivities = PredefinedTriggerSensitivities.SOME_TRIGGER;
kStream
.filter((id, activityLog) ->
isValidRecord(id, activityLog) &&
SUPPORTED_EVENT_TYPES.contains(activityLog.getType())
&& ruleService.hasRuleForActivity(id, activityLog, ThreatTrigger.TYPE.SOME_TRIGGER))
.selectKey((id, activityLog) -> new SelectedKey(id, activityLog.getEmail(),
ThreatTrigger.TYPE.SOME_TRIGGER, activityLog.getName()).toString())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.of(triggerSensitivities.getHighestTimePeriod(), ChronoUnit.MILLIS)))
.aggregate(ArrayList<ActivityLog>::new,
(selectedKey, activityLog, activityLogs) -> {
activityLogs.add(activityLog);
return activityLogs;
},
Materialized.with(Serdes.String(),
Serdes.serdeFrom(new JsonPOJOSerializer<>(), new JsonPOJODeserializer<>(ArrayList.class, ActivityLog.class))))
.toStream()
.selectKey((windowedKey, activityLogs) -> windowedKey.key())
.mapValues((selectedKey, activityLogs) ->
ruleService.getMatchedTriggerActivities(triggerSensitivities,
ThreatTrigger.TYPE.SOME_TRIGGER,
selectedKey,
activityLogs))
.to(ServerConstants.KAFKA.DETECTION_EVENTS_TOPIC);
То, чего я пытался достичь, это следующее:
Я получаюРазмещая различные журналы активности, фильтруя их по различным критериям, затем агрегируя их по определенному ключу за определенное время, и если я собрал их достаточно + некоторую дополнительную логику, я генерирую событие.Сначала работает как положено, а потом зависает.
Журналы брокера:
Куча INFO журналов, которые для меня выглядят почти одинаково
[2019-02-26 22:25:13,502] INFO [Log partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000200-repartition-1, dir=/tmp/kafka-logs] Incrementing log start offset to 3467 (kafka.log.Log)
[2019-02-26 22:25:55,241] INFO [Log partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000200-repartition-1, dir=/tmp/kafka-logs] Incrementing log start offset to 3470 (kafka.log.Log)
[2019-02-26 22:26:31,133] INFO [Log partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000200-repartition-1, dir=/tmp/kafka-logs] Incrementing log start offset to 3471 (kafka.log.Log)
[2019-02-26 22:27:07,845] INFO [ProducerStateManager partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000292-repartition-1] Writing producer snapshot at offset 2127 (kafka.log.ProducerStateManager)
[2019-02-26 22:27:07,845] INFO [Log partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000292-repartition-1, dir=/tmp/kafka-logs] Rolled new log segment at offset 2127 in 1 ms. (kafka.log.Log)
[2019-02-26 22:34:32,835] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
Потребительlogs:
Иногда печатается следующий журнал, но позже я обнаружил в нескольких источниках, что WARN не влияет на обработку.
2019-02-26 22:44:19.291 WARN 7350 --- [coronet-streams-6553b7a0-b6fb-4e07-ad16-c040374e201e-StreamThread-4] o.a.k.s.p.i.ProcessorStateManager : task [0
_0] Failed to write offset checkpoint file to /tmp/kafka-streams/coronet-streams/0_0/.checkpoint: {}
java.io.FileNotFoundException: /tmp/kafka-streams/coronet-streams/0_0/.checkpoint.tmp (No such file or directory)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:293)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:446)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
За исключением этого I DO SEE некоторые журналы, напечатанные здесь и там потоками потоков с использованием регистратора приложений, поэтому кажется, что потоки иногда работают (с некоторыми большими задержками), но в большинстве своем застряли, почему!
Было бы очень полезно, если бы кто-то мог указать, что может быть проблемой!
Спасибо!