Несовместимые сообщения Avro между Spring Cloud Stream Kafka Stream и собственными приложениями и производителями Kafka Stream - PullRequest
0 голосов
/ 23 апреля 2019

Примеры приложений для проверки этого можно найти в https://github.com/codependent/event-carried-state-transfer/tree/avro

  • kafka-xxx: собственные приложения
  • spring-boot-xxx: приложения Spring Cloud Stream

Проблема в Сообщения Avro, созданные собственным источником Kafka, не могут быть распакованы приложениями Spring Cloud Stream Например:

Собственный производитель Kafka (kafka-customer-сервисный проект)

@Component
class CustomerProducer {

    private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        props[ProducerConfig.CLIENT_ID_CONFIG] = "kafka-customer-producer"
        props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = IntegerSerializer::class.java.name
        props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }


    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }
}

Spring Cloud Stream Kafka Stream (spring-boot-shipping-service)

    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<Int, Customer>, ...): KStream<Int, OrderShippedEvent> {

        val serdeConfig = mapOf(
                AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081")

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = SpecificAvroSerde<Customer>()
        customerSerde.configure(serdeConfig, false)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)

        ...

В этом случаеПриложение Spring Cloud Stream отменяет маршализацию пустого клиентского DTO: {"id": 0, "name": "", "address": ""}

Теперь пытаемся наоборот, Spring CloudStream Producer и собственное приложение Kafka Streams :

Spring Cloud Stream Kafka Producer (spring-boot-customer-service)

spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
      bindings:
        output:
          destination: customer
          contentType: application/*+avro
      schema-registry-client:
        endpoint: http://localhost:8081

---

@Service
class CustomerServiceImpl(private val customerKafkaProducer: Source) : CustomerService {
   ...
   val message = MessageBuilder.withPayload(customer).setHeader(KafkaHeaders.MESSAGE_KEY, customer.id).build()
   customerKafkaProducer.output().send(message)
   ...

Native Kafka Stream (kafka-shipping-service)

    val builder = StreamsBuilder()

    val streamsConfiguration = Properties()
    streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
    //streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray()::class.java.name)
    //streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java)
    streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
    streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"

    val serdeConfig = mapOf(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
        AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
    )

    //val byteArraySerde = Serdes.ByteArray()
    val intSerde = Serdes.IntegerSerde()
    val customerSerde = SpecificAvroSerde<Customer>()
    customerSerde.configure(serdeConfig, false)

    val customerStream = builder.stream<Int, Customer>("customer",
        Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>

    val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
        Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
            .withKeySerde(intSerde)
            .withValueSerde(customerSerde)

    val customerTable = customerStream
        .map { key, value -> KeyValue(key, value) }
        .groupByKey(Serialized.with(intSerde, customerSerde))
        .reduce({ _, y -> y }, stateStore)

В этом случае dir нативного приложениясбои с исключением (org.apache.kafka.common.errors.SerializationException: Unknown magic byte!)

Exception in thread "kafka-shipping-service-b89157ba-b21f-46ba-911d-97f6080d477e-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:1023)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:861)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Disconnected from the target VM, address: '127.0.0.1:57856', transport: 'socket'

Process finished with exit code 0

Как обеспечить совместимость сообщений, генерируемых производителями Spring Cloud Stream / производителями Native Kafka, в разнородной корпоративной среде, где могут быть потребители, которые могут бытьнечетко Spring Cloud Stream Приложения Katfka Stream и родные потоки Kafka?

1 Ответ

1 голос
/ 24 апреля 2019

@ codependent В вашем первом случае - у вас есть собственный производитель Kafka, который использует KafkaAvroSerializer, и потребитель Cloud Cloud Stream Kafka Streams, который использует авросериализаторы, предоставленные Spring Cloud Stream. Это не будет работать, поскольку вы используете несовместимые сериализаторы / десериализаторы. Чтобы это исправить, на стороне Spring Cloud Stream вам нужно включить useNativeDecoding и предоставить avro Serde (SpecificAvroSerde). Таким образом, вы используете одну и ту же стратегию сериализации / десериализации.

Во втором случае вы получаете классическую ошибку (Unknown magic byte!), когда сериализаторы не совпадают. Опять та же проблема. У вас есть производитель Spring Cloud Stream, который использует сериализаторы из инфраструктуры, но на стороне потребления использует SpecificAvroSerde. Чтобы исправить это, вы можете включить useNativeEncoding на стороне производителя и разрешить использование avro-сериализатора. Или оберните сериализатор Avro из Spring Cloud Stream в Serde и предоставьте его потребителю.

Я думаю, что суть здесь в том, что при использовании avro в качестве формата обмена данными вам необходимо убедиться, что в вашей цепочке микросервисов используются те же стратегии сериализации / десериализации, которые полагаются на эти данные.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...