PAPI Kafka Streams: поведение при создании объектов AbstractProcessor - PullRequest
0 голосов
/ 17 июня 2019

У меня есть топология Kafka Streams, которая содержит AbstractProcessor (фактически два). В одном из них я использую Punctuation API с WALL_CLOCK_TIME, чтобы запланировать обновление некоторых справочных данных, необходимых для обработки. Я делаю это в начале задания, а затем планирую время от времени (скажем, 1 час). num.stream.threads настроен на 2.

например. У меня есть такой код:

def loadReferenceData() = {
      logger.info("Loading All Reference Data...")
      // atomically (re)load some data
}

override def init(context: ProcessorContext) = {
      super.init(context)
      logger.info("Loading reference data initially...")
      loadReferenceData()

      context.schedule(1000 * reloadDataSeconds, PunctuationType.WALL_CLOCK_TIME, (timestamp) => {
        loadReferenceData()
        context.commit(); // Unsure if necessary
      });
}

Без входящих записей, в журналах одного экземпляра при работающем приложении я вижу эти журналы для init:

    [2019-06-11 08:54:19,518] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 08:53:31,080] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 08:53:29,713] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 08:53:29,682] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:54:20,855] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:54:19,714] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:54:19,516] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:53:31,036] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:53:29,668] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 07:53:29,653] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 06:54:20,845] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
    [2019-06-11 06:54:19,726] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)

Так что, похоже, есть несколько журналов для ввода loadReferenceData, каждый час. Я ожидал увидеть только 2 записи в час (2 потока), но их больше (обычно 6).

В журналах я только 6 раз видел создание своего процессора в начале создания приложения:

[2019-06-10 16:54:19,849] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:54:18,231] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:54:17,874] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:29,675] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:27,132] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:24,923] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)

Так что это имеет смысл: Процессор создается один раз, а раз в час он обновляется.

Но когда я прикладываю больше усилий к своему приложению, я часто вижу создание новых объектов Processor.

  • При каких обстоятельствах Kafka Streams создаст новые экземпляры этих процессоров?
  • Как узнать, сколько экземпляров процессора создаст мой экземпляр приложения?
  • Если процессоры могут быть закрыты / созданы по усмотрению Kafka Streams, похоже, что для этих «внешних» операций API пунктуации излишне (или просто не предназначен для этого), и отдельный периодически обновляемый поток лучше бы справился, не так ли?

1 Ответ

1 голос
/ 17 июня 2019

Kafka-Streams создаст один процессор на раздел в теме ввода, каждый процессор будет иметь свое собственное расписание. (Это на самом деле весьма полезно, если вы используете хранилища состояний, так как состояние также будет разбито на части.)

Планировщик полезен, если вы хотите применять регулярные операции к своему внутреннему состоянию. Он хорошо останавливает обычную обработку и обеспечивает согласованность всего, что вы делаете во время операций по расписанию. Если рассматриваемая задача не имеет ничего общего с потоковой передачей как таковой, отдельный поток может быть таким же хорошим.

Если вы выбираете отдельный поток, убедитесь, что он соответствующим образом завершен, когда поток kafka-streams завершится сбоем . В противном случае ваше приложение будет зависать в потоке таймера, но не потреблять никаких сообщений kafka.

Увеличение количества потоков (num.stream.threads) означает, что несколько разделов будут использоваться одновременно. Это то же самое поведение, что и запуск нескольких экземпляров рядом друг с другом. См https://docs.confluent.io/current/streams/architecture.html#threading-model

...