Совет для реализации шаблона исходящих с Kafka с помощью сообщений Avro - PullRequest
0 голосов
/ 14 июня 2019

Шаблон исходящих сообщений требует, чтобы в таблице исходящих хранилищ содержался полезный груз объекта, чтобы ретранслятор сообщений мог получить к нему доступ и отправить его через Kafka. Почти каждый найденный пример хранит полезную нагрузку в формате JSON:

enter image description here

enter image description here

Суть в том, что я предпочитаю использовать формат Avro вместо JSON, чтобы использовать преимущества таких функций, как эволюция схемы.

Ниже вы можете увидеть схему Avro и класс Java, который она генерирует:

{
  "namespace" : "com.codependent.outboxpattern.account",
  "type" : "record",
  "name" : "TransferEmmitted",
  "fields" : [
    {"name":"transferId","type":"string"},
    {"name":"sourceAccountId","type":"long"},
    {"name":"destinationAccountId","type":"long"},
    {"name":"ammount","type":"float"}
  ]
}
---
public class TransferEmmitted extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
...

1) Как сохранить сущность TransferEmmited в полезной нагрузке таблицы Исходящие?

2) Что касается ретранслятора сообщений, как он должен генерировать сообщение из полезной нагрузки, хранящейся в базе данных, для передачи его производителю Spring Cloud Stream Kafka?

Я реализовал первый подход , но кажется немного неловким, и я не уверен, что это лучший способ сделать это.

account-service: сохраняет сущность TransferEmmitted Avro в таблице исходящих сообщений, используя AvroSchemaRegistryClientMessageConverter для генерации сообщения и извлечения полезной нагрузки его байтового массива:

@Transactional
@Service
class OutboxServiceImpl(private val outboxRepository: OutboxRepository,
                        private val avroMessageConverter: AvroSchemaRegistryClientMessageConverter) : OutboxService {

    override fun save(messageId: String, topic: String, entity: SpecificRecordBase) {
        val message = avroMessageConverter.toMessage(entity, MessageHeaders(mutableMapOf(MessageHeaders.CONTENT_TYPE to "application/*+avro" as Any))) as Message<*>
        outboxRepository.save(Outbox(0, messageId, topic, entity.schema.name, State.PENDING, message.payload as ByteArray))
    }

}

message-relay-service: читает из таблицы Исходящие и отправляет сообщение, если ожидает:

@Component
class MessageRelay(private val outboxService: OutboxService,
                   private val source: Source) {

    @Scheduled(fixedDelay = 10000)
    fun checkOutbox() {
        val pending = outboxService.getPending()
        pending.forEach {
            val message = MessageBuilder.withPayload(it.payload)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, it.messageKey)
                    .build()
            source.output().send(message)
            outboxService.markAsProcessed(it.id)
        }
    }

}

Эта реализация работает, но есть некоторые аспекты, которые заставляют меня усомниться в ее правильности:

1) Служба учета хранит полезную нагрузку объекта, используя AvroSchemaRegistryClientMessageConverter. Может быть, есть лучший способ сделать это ...

2) Сообщения, отправляемые Kafka из службы ретрансляции сообщений, имеют тип контента application/*+avro вместо чего-то вроде application/vnd.transferemitted.v1+avro.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...