KafkaStreamsStateStore не работает, если значением хранилища является Avro SpecificRecord - PullRequest
1 голос
/ 17 июня 2019

У меня есть приложение Spring Cloud Kafka Streams, которое использует StateStore в Processor API при использовании преобразователя для выполнения дедупликации.

Ключ-значение хранилища состояний имеют следующие типы: <String, TransferEmitted>.

При запуске приложения, в момент помещения значения в хранилище состояний (dedupStore.put(key, value)), я получаю следующее исключение:

Причина: java.lang.ClassCastException: com.codependent.outboxpattern.account.TransferEmitted не может быть приведен к java.lang.String

Это связано с тем, что значением по умолчанию для serde для KafkaStreamsStateStore является StringSerde.

Таким образом, я добавил параметр valueSerde в аннотацию KafkaStreamsStateStore, указав его для SpecificAvroSerde:

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE,
            valueSerde = "io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde")

Теперь я получаю исключение NullPointerException в AbstractKafkaAvroSerializer.serializeImpl, посколькув id = this.schemaRegistry.getId(subject, schema); schemaRegistry имеет значение null:

Вызывается: org.apache.kafka.common.errors.SerializationException: Ошибка сериализации сообщения Avro Вызывается: java.lang.NullPointerException в io.confluent.kafka.serializers.SpecificAvroSerializer.java:65) в io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize (SpecificAvroSerializer.java:38)

Несмотря на настройку реестра схемы в виде Spring bean-компонента ...

@Configuration
class SchemaRegistryConfiguration {

    @Bean
    fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
        val client = ConfluentSchemaRegistryClient()
        client.setEndpoint(endpoint)
        return client
    }

}

... когда Kafka устанавливает SpecificAvroSerde, он использует конструктор без параметров, поэтому он не инициализирует клиента реестра схемы:

public class SpecificAvroSerde<T extends SpecificRecord> implements Serde<T> {
    private final Serde<T> inner;

    public SpecificAvroSerde() {
        this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new SpecificAvroDeserializer());
    }

    public SpecificAvroSerde(SchemaRegistryClient client) {
        if (client == null) {
            throw new IllegalArgumentException("schema registry client must not be null");
        } else {
            this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new SpecificAvroDeserializer(client));
        }
    }

Как я могу настроить это приложение так, чтобы оно позволяло сериализовать StateStore<String, TransferEmitted>?

ВЫДЕРЖКИ ИЗ ПРОЕКТА (источник доступен на https://github.com/codependent/kafka-outbox-pattern)

KStream

const val DEDUP_STORE = "dedup-store"

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
        return input
                .transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
                .filter { _, value -> fraudDetectionService.isFraudulent(value) }

    }

}

Трансформатор

@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {

    private lateinit var dedupStore: KeyValueStore<String, TransferEmitted>
    private lateinit var context: ProcessorContext

    override fun init(context: ProcessorContext) {
        this.context = context
        dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, TransferEmitted>
    }

    override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
        return if (isDuplicate(key)) {
            null
        } else {
            dedupStore.put(key, value)
            KeyValue(key, value)
        }
    }

    private fun isDuplicate(key: String) = dedupStore[key] != null

    override fun close() {
    }
}

application.yml

spring:
  application:
    name: fraud-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: fraud-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              schema:
                registry:
                  url: http://localhost:8081
      bindings:
        input:
          destination: transfer
          contentType: application/*+avro
        output:
          destination: fraudulent-transfer
          contentType: application/*+avro

server:
  port: 8086

logging:
  level:
    org.springframework.cloud.stream: debug

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...