Шаблон исходящих сообщений требует, чтобы в таблице исходящих хранилищ содержался полезный груз объекта, чтобы ретранслятор сообщений мог получить к нему доступ и отправить его через Kafka. Почти каждый найденный пример хранит полезную нагрузку в формате JSON:
Суть в том, что я предпочитаю использовать формат Avro вместо JSON, чтобы использовать преимущества таких функций, как эволюция схемы.
Ниже вы можете увидеть схему Avro и класс Java, который она генерирует:
{
"namespace" : "com.codependent.outboxpattern.account",
"type" : "record",
"name" : "TransferEmmitted",
"fields" : [
{"name":"transferId","type":"string"},
{"name":"sourceAccountId","type":"long"},
{"name":"destinationAccountId","type":"long"},
{"name":"ammount","type":"float"}
]
}
---
public class TransferEmmitted extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
...
1) Как сохранить сущность TransferEmmited в полезной нагрузке таблицы Исходящие?
2) Что касается ретранслятора сообщений, как он должен генерировать сообщение из полезной нагрузки, хранящейся в базе данных, для передачи его производителю Spring Cloud Stream Kafka?
Я реализовал первый подход , но кажется немного неловким, и я не уверен, что это лучший способ сделать это.
account-service: сохраняет сущность TransferEmmitted Avro в таблице исходящих сообщений, используя AvroSchemaRegistryClientMessageConverter
для генерации сообщения и извлечения полезной нагрузки его байтового массива:
@Transactional
@Service
class OutboxServiceImpl(private val outboxRepository: OutboxRepository,
private val avroMessageConverter: AvroSchemaRegistryClientMessageConverter) : OutboxService {
override fun save(messageId: String, topic: String, entity: SpecificRecordBase) {
val message = avroMessageConverter.toMessage(entity, MessageHeaders(mutableMapOf(MessageHeaders.CONTENT_TYPE to "application/*+avro" as Any))) as Message<*>
outboxRepository.save(Outbox(0, messageId, topic, entity.schema.name, State.PENDING, message.payload as ByteArray))
}
}
message-relay-service: читает из таблицы Исходящие и отправляет сообщение, если ожидает:
@Component
class MessageRelay(private val outboxService: OutboxService,
private val source: Source) {
@Scheduled(fixedDelay = 10000)
fun checkOutbox() {
val pending = outboxService.getPending()
pending.forEach {
val message = MessageBuilder.withPayload(it.payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
.build()
source.output().send(message)
outboxService.markAsProcessed(it.id)
}
}
}
Эта реализация работает, но есть некоторые аспекты, которые заставляют меня усомниться в ее правильности:
1) Служба учета хранит полезную нагрузку объекта, используя AvroSchemaRegistryClientMessageConverter
. Может быть, есть лучший способ сделать это ...
2) Сообщения, отправляемые Kafka из службы ретрансляции сообщений, имеют тип контента application/*+avro
вместо чего-то вроде application/vnd.transferemitted.v1+avro
.