Kafka Streams PAPI: закрытие процессора вызывается при запуске и слишком часто - PullRequest
0 голосов
/ 08 июня 2018

Как показано в Confluent docs при написании приложений PAPI , вы должны закрыть хранилища, которые вы используете в своем процессоре, переопределив метод close .

В примере WordCountProcessor, он показывает, как магазин должен быть закрыт при вызове метода close ().

Я сделал что-то подобное (я не запускаю их в методе init (), но использую lazy val в Scala), и я обнаружил, что мой метод Processor close () вызывается сразу после созданияхранить и несколько раз.

class EventWindowProcessor(sessionStoreName: String, lastSessionByChannelStoreName: String, lastChannelStoreName: String)
extends AbstractProcesso

// example of a store
private lazy val lastChannelStore: KeyValueStore[MyKey, Channel] =
  context()
    .getStateStore(lastChannelStoreName)
    .asInstanceOf[KeyValueStore[MyKey, Channel]]

override def init(context: ProcessorContext) = {
  super.init(context)
}

override def close() = {
  logger.info("CLOSING PROCESSOR") 
}

override def process(key: String, value: String): Unit = {
    // ... my stuff here
}

Таким образом, я получаю следующий вывод, показывающий, что процессор.close () вызывается так много раз в начале работы топологии, а также вызывается в более поздних точках приложения.

[2018-06-08 05:13:16,255] INFO Stream Application starting, name: stream-processor (my.package.StreamProcessorApplication$)
[2018-06-08 05:13:16,760] INFO Topology: Sub-topologies:
Sub-topology: 0
Source: event-source (topics: [events])
--> session-processor
Processor: session-processor (stores: [sessionStoreName, lastSessionByChannelStoreName, lastChannelStoreName])
--> error-event-sink, order-sink, pageviews-sink, session-sink
<-- event-source
Sink: error-event-sink (topic: error-events)
<-- session-processor
Sink: order-sink (topic: orders)
<-- session-processor
Sink: pageviews-sink (topic: pageviews)
<-- session-processor
Sink: session-sink (topic: sessions)
<-- session-processor
Global Stores:
none
(my.package.StreamProcessorApplication$)
[2018-06-08 05:14:01,425] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,539] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
[2018-06-08 05:14:01,640] INFO CLOSING PROCESSOR (my.package.StreamProcessor)
      ... (102 lines like that)
[2018-06-08 05:29:05,548] INFO   .... my own application logging here 

... поэтому, если я закрываю хранилища в том методе close (), когда мой код в process () пытается их использовать, возникает исключение, сообщающее, что хранилище закрыто.

Почему processor.close () вызывается в начале запуска KafkaStreams?И почему это происходит так часто?

Каковы риски не явного закрытия магазинов?

1 Ответ

0 голосов
/ 08 июня 2018

Пример в документации неверен.Вы не должны закрывать магазин - магазин управляется Kafka Streams, и Kafka Streams закроет магазин для вас.(Я сделаю PR, чтобы исправить пример кода. Спасибо за указание.)

Об обращениях к Processor#close(): ожидается, что процессор может быть закрыт и повторно открыт.Это происходит во время восстановления баланса.Таким образом, ваш код должен быть написан таким образом, чтобы он правильно работал для нескольких вызовов init() и close() - для этого мы недавно обновили JavaDocs (улучшенные JavaDocs будут частью выпуска Kafka 2.0).

...