Я получаю эту ошибку в Kafka 2.0, в то время как код работает для Kafka 1.1
"Exception in thread "main" org.apache.kafka.streams.errors.TopologyException: Invalid topology: Processor callmanagerprocessor must have at least one parent"
Мой код работает в Kafka 1.1 отлично, не уверен, что мне нужно изменить для 2.0
class CallManagersProcessor extends Processor[String, String] with Logging {
private var context: ProcessorContext = _
var callManagersProcessorCounter:Long = 0
override def init(processorContext: ProcessorContext): Unit = {
this.context = processorContext
StreamConstants.callManagerStore = Option(context.getStateStore(CallManagersProcessor.STORE).asInstanceOf[KeyValueStore[String, CallManager]])
}
override def process(key: String, value: String): Unit = {
logger.info("callManagersProcessorCounter ="+callManagersProcessorCounter )
callManagersProcessorCounter = callManagersProcessorCounter +1
val cmKey = processManagerKey(key)
if (cmKey.isDefined) {
val cmValue = processManagerValue(value)
if (cmValue.isDefined) {
StreamConstants.callManagerStore.get.put(cmKey.get,cmValue.get)
context.forward(cmKey.get, cmValue.get.toJson.toString())
} else {
StreamConstants.callManagerStore.get.put(cmKey.get,null)
context.forward(cmKey.get, null)
}
}
}
Код топологии:
class DimensionTopology(streamName: String) extends TopologyProvider with Logging {
override val name: String = streamName
val objectSerdes: Serde[Object] = Serdes.serdeFrom(new ObjectSerializer(), new ObjectDeserializer())
override type BuilderType = Topology
//Processor API
override def topology(builder: BuilderType, source: String, sink: String): Topology = {
val callManagersProcessor: ProcessorSupplier[String, String] = () => new CallManagersProcessor()
val customersProcessor: ProcessorSupplier[String, String] = () => new CustomersProcessor()
builder.addProcessor(CallManagersProcessor.PROCESSOR, callManagersProcessor)
.addGlobalStore( Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(CallManagersProcessor.STORE),
objectSerdes,
objectSerdes
).withLoggingDisabled(),
CallManagersProcessor.SOURCE,
Serdes.String().deserializer(),
Serdes.String().deserializer(),
CallManagersProcessor.INPUT_TOPIC,
CallManagersProcessor.GLOBAL_STORE,
callManagersProcessor)
.addSink(CallManagersProcessor.SINK, CallManagersProcessor.OUTPUT_TOPIC,
Serdes.String().serializer(), Serdes.String().serializer(), CallManagersProcessor.PROCESSOR)
.addSource(CallDetailsProcessor.SOURCE,Serdes.String().deserializer(), Serdes.String().deserializer(), CallDetailsProcessor.INPUT_TOPIC)
.addProcessor(CallDetailsProcessor.PROCESSOR, new ProcessorSupplier[String, String] {
override def get(): Processor[String, String] = {
new CallDetailsProcessor()
}
}, CallDetailsProcessor.SOURCE)
.addSink(CallDetailsProcessor.SINK, CallDetailsProcessor.OUTPUT_TOPIC,
Serdes.String().serializer(), Serdes.String().serializer(), CallDetailsProcessor.PROCESSOR)
builder
}
override def createBuilder(): BuilderType = new Topology
}
object DimensionTopology{
}