Потоки Kafka Соединение Ktable Ktable не выдает результатов в теме вывода - PullRequest
1 голос
/ 09 марта 2020

Я пытаюсь очень простое соединение Ktable к Ktable в Scala. Обе темы имеют один раздел, но все же я не вижу ничего подходящего к 'output-topi c'

object SimpleMerge extends App {
  val properties = KafkaProperties.get()

  val streamBuilder = new StreamsBuilder()

  val objectMetadataTable: KTable[Long, String] = streamBuilder.table("metadata-topic")

  val objectClassificationTable: KTable[Long, String] = streamBuilder.table("classification-topic")

  val objectClassificationWithMetadata: KTable[Long, String] = objectMetadataTable
    .join(objectClassificationTable, (metadata: String, classification: String) => metadata + classification)


  objectClassificationWithMetadata.toStream().to("output-topic")

  val kafkaStreams = new KafkaStreams(streamBuilder.build(), properties)
  kafkaStreams.cleanUp()
  kafkaStreams.start()

  Runtime.getRuntime.addShutdownHook(new Thread(() => {
    kafkaStreams.close()
  }))
}

Я уверен, что это очень просто c, но я не могу найти никакой подсказки. Примечание. Я вижу, что обе темы ввода дают результаты с правильными ключами, поэтому вопрос о нулевой клавише уже исключен.

Спасибо

Исключение

02:26:57.936 [MetadataClassificationMerger_1-757746d5-32e5-412b-934d-d29c1fc8c7a9-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - stream-thread [MetadataClassificationMerger_1-757746d5-32e5-412b-934d-d29c1fc8c7a9-StreamThread-1] task [0_0] Failed to flush state store metadata-topic-STATE-STORE-0000000000: 
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: [B, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:333) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:276) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:582) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:536) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:524) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:529) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:959) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:813) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-2.4.0.jar:?]
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
    at com.pan.pcs.dlp.kafkamerger.MergeObjectMetadataClassification$$anon$1.apply(MergeObjectMetadataClassification.scala:19) ~[classes/:?]
    at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:110) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:67) ~[kafka-streams-2.4.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-2.4.0.jar:?]
    ... 24 more

1 Ответ

0 голосов
/ 10 марта 2020

По умолчанию ByteArraySerdes используются, и сообщение об ошибке указывает, что ваша программа все еще использует их, если LongSerde для ключей и StringSerde для значений.

Если вы используете официальный Scala API и загрузить соответствующие экспликации, правильные Serdes должны использоваться автоматически. В противном случае вам потребуется либо установить другие значения по умолчанию с помощью StreamsConfig, либо перезаписать их для каждого оператора, передав Consumed.with(...) в операторы table(...).

Подробнее см. В документации : https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstreams -dsl-for- scala

...