Как написать класс процессора impl в Кафке 2.0 - PullRequest
0 голосов
/ 29 декабря 2018

Я получаю эту ошибку в Kafka 2.0, в то время как код работает для Kafka 1.1

"Exception in thread "main" org.apache.kafka.streams.errors.TopologyException: Invalid topology: Processor callmanagerprocessor must have at least one parent"

Мой код работает в Kafka 1.1 отлично, не уверен, что мне нужно изменить для 2.0

class CallManagersProcessor extends Processor[String, String] with Logging {
  private var context: ProcessorContext = _

  var callManagersProcessorCounter:Long = 0

  override def init(processorContext: ProcessorContext): Unit = {
    this.context = processorContext
    StreamConstants.callManagerStore = Option(context.getStateStore(CallManagersProcessor.STORE).asInstanceOf[KeyValueStore[String, CallManager]])
  }

  override def process(key: String, value: String): Unit = {

    logger.info("callManagersProcessorCounter ="+callManagersProcessorCounter )

    callManagersProcessorCounter = callManagersProcessorCounter +1
    val cmKey = processManagerKey(key)
    if (cmKey.isDefined) {
      val cmValue = processManagerValue(value)
      if (cmValue.isDefined) {

        StreamConstants.callManagerStore.get.put(cmKey.get,cmValue.get)
        context.forward(cmKey.get, cmValue.get.toJson.toString())
      } else {

        StreamConstants.callManagerStore.get.put(cmKey.get,null)
        context.forward(cmKey.get, null)
      }
    }

  }

Код топологии:

class DimensionTopology(streamName: String) extends TopologyProvider with Logging {
  override val name: String = streamName
  val objectSerdes: Serde[Object] = Serdes.serdeFrom(new ObjectSerializer(), new ObjectDeserializer())
  override type BuilderType = Topology

  //Processor API
  override def topology(builder: BuilderType, source: String, sink: String): Topology = {

    val callManagersProcessor: ProcessorSupplier[String, String] = () => new CallManagersProcessor()
    val customersProcessor: ProcessorSupplier[String, String] = () => new CustomersProcessor()


    builder.addProcessor(CallManagersProcessor.PROCESSOR, callManagersProcessor)
      .addGlobalStore( Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore(CallManagersProcessor.STORE),
        objectSerdes,
        objectSerdes
      ).withLoggingDisabled(),
        CallManagersProcessor.SOURCE,
        Serdes.String().deserializer(),
        Serdes.String().deserializer(),
        CallManagersProcessor.INPUT_TOPIC,
        CallManagersProcessor.GLOBAL_STORE,
        callManagersProcessor)
      .addSink(CallManagersProcessor.SINK, CallManagersProcessor.OUTPUT_TOPIC,
        Serdes.String().serializer(), Serdes.String().serializer(), CallManagersProcessor.PROCESSOR)


      .addSource(CallDetailsProcessor.SOURCE,Serdes.String().deserializer(), Serdes.String().deserializer(), CallDetailsProcessor.INPUT_TOPIC)
            .addProcessor(CallDetailsProcessor.PROCESSOR, new ProcessorSupplier[String, String] {
              override def get(): Processor[String, String] = {
                new CallDetailsProcessor()
              }
            }, CallDetailsProcessor.SOURCE)
            .addSink(CallDetailsProcessor.SINK, CallDetailsProcessor.OUTPUT_TOPIC,
              Serdes.String().serializer(), Serdes.String().serializer(), CallDetailsProcessor.PROCESSOR)


    builder
  }

  override def createBuilder(): BuilderType = new Topology
}

object DimensionTopology{

}
...