Потоки Kafka застряли на потреблении вскоре после запуска приложения - PullRequest
0 голосов
/ 27 февраля 2019

Я использую оба Kafka и Kafka Streams в моем приложении Spring.В то время как обычная Kafka связь, такая как отправка / получение в / из тем работает нормально, Kafka Streams перестает потреблять (обрабатывать записи) вскоре после запуска приложения.Странно то, что при перезапуске приложения обработка записей возобновляется на короткое время, и даже некоторые обработанные данные отправляются в тему, но вскоре после того, как потоки Кафки снова застряли.
IЯ почти уверен, что мне чего-то не хватает, возможно, потоки Kafka настроены неправильно или, возможно, я потребляю неправильно.

Конфигурация брокера: У меня есть кластер из 3 брокеров, в общем, я использую конфигурацию брокера Kafka по умолчанию, которая поставляется с двоичными файлами, за исключением того, что я увеличил количество разделов вв тему 3 .

Только те конфигурации, которые я изменил (также из-за размера кластера, как рекомендовано):

num.partitions=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3

Версии и ОС:
Я использую Kafka Streams v2.1.0 , Клиенты Kafka v2.1.0 , Брокеры Kafka v2.1.0 и Spring Kafka 2.2.3.RELEASE .

Брокеры и потребители, работающие на Debian GNU / Linux 9.8 (растяжение) .

Конфигурация Java Kafka Streams:

        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, STREAMS_ID);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 15);
        props.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, StreamExceptionHandler.class);
        props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);

У меня есть 15 потоковых потоков для оптимального параллелизма с использованием следующей логики:
# num_of_partitions * # num_of_stream_topics

Примерчто делает потребитель Kafka Streams:

    KStream<String, ActivityLog> kStream = kStreamBuilder.stream(ServerConstants.KAFKA.ACTIVITY_LOGS_DESTINATION_TOPIC, Consumed.with(Serdes.String(), getActivityLogSerde()));

    TriggerSensitivities triggerSensitivities = PredefinedTriggerSensitivities.SOME_TRIGGER;

    kStream
            .filter((id, activityLog) ->
                    isValidRecord(id, activityLog) &&
                            SUPPORTED_EVENT_TYPES.contains(activityLog.getType())
                            && ruleService.hasRuleForActivity(id, activityLog, ThreatTrigger.TYPE.SOME_TRIGGER))

            .selectKey((id, activityLog) -> new SelectedKey(id, activityLog.getEmail(),
                    ThreatTrigger.TYPE.SOME_TRIGGER, activityLog.getName()).toString())
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.of(triggerSensitivities.getHighestTimePeriod(), ChronoUnit.MILLIS)))
            .aggregate(ArrayList<ActivityLog>::new,
                    (selectedKey, activityLog, activityLogs) -> {
                        activityLogs.add(activityLog);
                        return activityLogs;
                    },
                    Materialized.with(Serdes.String(),
                            Serdes.serdeFrom(new JsonPOJOSerializer<>(), new JsonPOJODeserializer<>(ArrayList.class, ActivityLog.class))))
            .toStream()
            .selectKey((windowedKey, activityLogs) -> windowedKey.key())
            .mapValues((selectedKey, activityLogs) ->
                    ruleService.getMatchedTriggerActivities(triggerSensitivities,
                            ThreatTrigger.TYPE.SOME_TRIGGER,
                            selectedKey,
                            activityLogs))
            .to(ServerConstants.KAFKA.DETECTION_EVENTS_TOPIC);

То, чего я пытался достичь, это следующее:
Я получаюРазмещая различные журналы активности, фильтруя их по различным критериям, затем агрегируя их по определенному ключу за определенное время, и если я собрал их достаточно + некоторую дополнительную логику, я генерирую событие.Сначала работает как положено, а потом зависает.

Журналы брокера:
Куча INFO журналов, которые для меня выглядят почти одинаково

[2019-02-26 22:25:13,502] INFO [Log partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000200-repartition-1, dir=/tmp/kafka-logs] Incrementing log start offset to 3467 (kafka.log.Log)
[2019-02-26 22:25:55,241] INFO [Log partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000200-repartition-1, dir=/tmp/kafka-logs] Incrementing log start offset to 3470 (kafka.log.Log)
[2019-02-26 22:26:31,133] INFO [Log partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000200-repartition-1, dir=/tmp/kafka-logs] Incrementing log start offset to 3471 (kafka.log.Log)
[2019-02-26 22:27:07,845] INFO [ProducerStateManager partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000292-repartition-1] Writing producer snapshot at offset 2127 (kafka.log.ProducerStateManager)
[2019-02-26 22:27:07,845] INFO [Log partition=coronet-streams-KSTREAM-AGGREGATE-STATE-STORE-0000000292-repartition-1, dir=/tmp/kafka-logs] Rolled new log segment at offset 2127 in 1 ms. (kafka.log.Log)
[2019-02-26 22:34:32,835] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

Потребительlogs:
Иногда печатается следующий журнал, но позже я обнаружил в нескольких источниках, что WARN не влияет на обработку.

2019-02-26 22:44:19.291  WARN 7350 --- [coronet-streams-6553b7a0-b6fb-4e07-ad16-c040374e201e-StreamThread-4] o.a.k.s.p.i.ProcessorStateManager        : task [0
_0] Failed to write offset checkpoint file to /tmp/kafka-streams/coronet-streams/0_0/.checkpoint: {}

java.io.FileNotFoundException: /tmp/kafka-streams/coronet-streams/0_0/.checkpoint.tmp (No such file or directory)
        at java.io.FileOutputStream.open0(Native Method)
        at java.io.FileOutputStream.open(FileOutputStream.java:270)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
        at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
        at org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:293)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:446)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
        at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346)
        at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

За исключением этого I DO SEE некоторые журналы, напечатанные здесь и там потоками потоков с использованием регистратора приложений, поэтому кажется, что потоки иногда работают (с некоторыми большими задержками), но в большинстве своем застряли, почему!

Было бы очень полезно, если бы кто-то мог указать, что может быть проблемой!
Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...