У меня есть топология Kafka Streams, которая содержит AbstractProcessor (фактически два).
В одном из них я использую Punctuation API
с WALL_CLOCK_TIME
, чтобы запланировать обновление некоторых справочных данных, необходимых для обработки.
Я делаю это в начале задания, а затем планирую время от времени (скажем, 1 час).
num.stream.threads
настроен на 2.
например. У меня есть такой код:
def loadReferenceData() = {
logger.info("Loading All Reference Data...")
// atomically (re)load some data
}
override def init(context: ProcessorContext) = {
super.init(context)
logger.info("Loading reference data initially...")
loadReferenceData()
context.schedule(1000 * reloadDataSeconds, PunctuationType.WALL_CLOCK_TIME, (timestamp) => {
loadReferenceData()
context.commit(); // Unsure if necessary
});
}
Без входящих записей, в журналах одного экземпляра при работающем приложении я вижу эти журналы для init:
[2019-06-11 08:54:19,518] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 08:53:31,080] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 08:53:29,713] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 08:53:29,682] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:54:20,855] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:54:19,714] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:54:19,516] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:53:31,036] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:53:29,668] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:53:29,653] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 06:54:20,845] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 06:54:19,726] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
Так что, похоже, есть несколько журналов для ввода loadReferenceData
, каждый час. Я ожидал увидеть только 2 записи в час (2 потока), но их больше (обычно 6).
В журналах я только 6 раз видел создание своего процессора в начале создания приложения:
[2019-06-10 16:54:19,849] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:54:18,231] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:54:17,874] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:29,675] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:27,132] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:24,923] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
Так что это имеет смысл: Процессор создается один раз, а раз в час он обновляется.
Но когда я прикладываю больше усилий к своему приложению, я часто вижу создание новых объектов Processor.
- При каких обстоятельствах Kafka Streams создаст новые экземпляры этих процессоров?
- Как узнать, сколько экземпляров процессора создаст мой экземпляр приложения?
- Если процессоры могут быть закрыты / созданы по усмотрению Kafka Streams, похоже, что для этих «внешних» операций API пунктуации излишне (или просто не предназначен для этого), и отдельный периодически обновляемый поток лучше бы справился, не так ли?