Как мне создать государственный магазин для трансформатора - PullRequest
0 голосов
/ 26 мая 2018

Я пытаюсь создать Transformer и сталкиваюсь с проблемами при инициализации его StateStore.Я посмотрел на пример в Как зарегистрировать процессор без сохранения состояния (который, похоже, требует и StateStore)? , и это имеет смысл, но я пытаюсь что-то другое:

KeyValueBytesStoreSupplier groupToKVStore_supplier = 
    Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< G, KeyValue< K, V > > > groupToKVStore_builder =
    Stores.keyValueStoreBuilder( groupToKVStore_supplier, Gserde, KVserde );
stream_builder.addStateStore( groupToKVStore_builder );

Мое намерение состоит в том, чтобы использовать String в качестве ключа хранилища состояний и KeyValue в качестве значения хранилища состояний.Правильная ли формулировка выше?Я спрашиваю, потому что, когда поток, содержащий мой Transformer, запускается, он генерирует исключение, которое говорит:

Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-TRANSFORM-0000000001 has no access to StateStore state_store_1582785598
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:72)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.init(WindowedTimeSorter.java:135)
    at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.init(KStreamTransform.java:51)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:54)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:10

По предложению Матиаса, я добавил аргумент имени StateStore к transformвызов в моем потоке, и это, кажется, избавит нас от ошибки, показанной выше.Однако затем мы получаем следующее исключение:

ERROR stream-thread [A.Completely.Different.appID-b04af4b4-fdbb-4353-9aa5-6d71f7c22f9e-StreamThread-1] Failed to process stream task 0_1 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks:105) 
java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:153)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:167)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:1)
    at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)

Увы, все еще не совсем правильно: во-первых, метод init моего Transformer вызывается три раза;это должно быть только один раз, верно?Во-вторых, я получаю ошибку времени выполнения в методе transform моего Transformer при первой попытке сохранить что-либо в StateStore:

INFO stream-thread [A.Completely.Different.appID-7dc67466-20f4-4e6c-8a69-bc0710a42f3c-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1124) 
Exception in thread "A.Completely.Different.appID-7dc67466-20f4-4e6c-8a69-bc0710a42f3c-StreamThread-1" java.lang.IllegalStateException: This should not happen as timestamp() should only be called while a record is processed
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.timestamp(AbstractProcessorContext.java:153)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at com.ui.streaming.processors.sort.WindowedTimeSorter.transform(WindowedTimeSorter.java:155)

1 Ответ

0 голосов
/ 27 мая 2018

Простого добавления магазина в топологию недостаточно.Вам также необходимо подключить хранилище к преобразователю, передав имя магазина в transform():

stream.transform(..., state_store_name);

Обновление:

Для второго исключения я предполагаючто вы не возвращаете новый объект при вызове TransformerSupplier#get(), но каждый раз возвращаете один и тот же объект.Как предполагает «шаблон поставщика», вам нужно создавать новый объект каждый раз, когда вызывается #get() (в противном случае поставщик не имел бы смысла, и было бы возможно передать один объект напрямую).Сравните FAQ: https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata

...