У меня есть приложение Spring Cloud Kafka Streams, которое использует StateStore в Processor API при использовании преобразователя для выполнения дедупликации.
Ключ-значение хранилища состояний имеют следующие типы: <String, TransferEmitted>
.
При запуске приложения, в момент помещения значения в хранилище состояний (dedupStore.put(key, value)
), я получаю следующее исключение:
Причина: java.lang.ClassCastException: com.codependent.outboxpattern.account.TransferEmitted не может быть приведен к java.lang.String
Это связано с тем, что значением по умолчанию для serde для KafkaStreamsStateStore
является StringSerde
.
Таким образом, я добавил параметр valueSerde в аннотацию KafkaStreamsStateStore
, указав его для SpecificAvroSerde
:
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE,
valueSerde = "io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde")
Теперь я получаю исключение NullPointerException в AbstractKafkaAvroSerializer.serializeImpl
, посколькув id = this.schemaRegistry.getId(subject, schema);
schemaRegistry имеет значение null:
Вызывается: org.apache.kafka.common.errors.SerializationException: Ошибка сериализации сообщения Avro Вызывается: java.lang.NullPointerException в io.confluent.kafka.serializers.SpecificAvroSerializer.java:65) в io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize (SpecificAvroSerializer.java:38)
Несмотря на настройку реестра схемы в виде Spring bean-компонента ...
@Configuration
class SchemaRegistryConfiguration {
@Bean
fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
val client = ConfluentSchemaRegistryClient()
client.setEndpoint(endpoint)
return client
}
}
... когда Kafka устанавливает SpecificAvroSerde
, он использует конструктор без параметров, поэтому он не инициализирует клиента реестра схемы:
public class SpecificAvroSerde<T extends SpecificRecord> implements Serde<T> {
private final Serde<T> inner;
public SpecificAvroSerde() {
this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new SpecificAvroDeserializer());
}
public SpecificAvroSerde(SchemaRegistryClient client) {
if (client == null) {
throw new IllegalArgumentException("schema registry client must not be null");
} else {
this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new SpecificAvroDeserializer(client));
}
}
Как я могу настроить это приложение так, чтобы оно позволяло сериализовать StateStore<String, TransferEmitted>
?
ВЫДЕРЖКИ ИЗ ПРОЕКТА (источник доступен на https://github.com/codependent/kafka-outbox-pattern)
KStream
const val DEDUP_STORE = "dedup-store"
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
return input
.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
.filter { _, value -> fraudDetectionService.isFraudulent(value) }
}
}
Трансформатор
@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {
private lateinit var dedupStore: KeyValueStore<String, TransferEmitted>
private lateinit var context: ProcessorContext
override fun init(context: ProcessorContext) {
this.context = context
dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, TransferEmitted>
}
override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
return if (isDuplicate(key)) {
null
} else {
dedupStore.put(key, value)
KeyValue(key, value)
}
}
private fun isDuplicate(key: String) = dedupStore[key] != null
override fun close() {
}
}
application.yml
spring:
application:
name: fraud-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: fraud-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
schema:
registry:
url: http://localhost:8081
bindings:
input:
destination: transfer
contentType: application/*+avro
output:
destination: fraudulent-transfer
contentType: application/*+avro
server:
port: 8086
logging:
level:
org.springframework.cloud.stream: debug