Dynami c переключение между двумя объектами kafkaTemplate - PullRequest
0 голосов
/ 17 апреля 2020

У меня есть два кластера кафки (Active-пассивный). Я создал два объекта KafkaTemplate для создания событий (по одному для каждого). Я буду продолжать использовать основной kafkaTemplate для отправки событий, пока он работает. В случае любой проблемы, я хочу динамически переключиться на второй шаблон kafka. Я не хочу иметь условие if для переключения между объектами, используемыми для отправки (в крайнем случае)

Есть ли способ динамического переключения объектов?

На стороне потребителя у нас есть MessageListenerContainers, и мы установили apiAutoStartUp api для управления пуском и остановкой нескольких слушателей

Может ли кто-нибудь помочь мне с продюсером ?!

1 Ответ

1 голос
/ 17 апреля 2020

Просто реализуйте свой собственный KafkaOperations, который делегирует активному шаблону и переключается при необходимости.

РЕДАКТИРОВАТЬ

Например:

public class DelegatingTemplate<K, V> implements KafkaOperations<K, V> {

    private final KafkaTemplate<String, String> template1;

    private final KafkaTemplate<String, String> template2;

    private volatile KafkaTemplate<String, String> currentTemplate;

    public DelegatingTemplate(KafkaTemplate<String, String> template1, KafkaTemplate<String, String> template2) {
        this.template1 = template1;
        this.template2 = template2;
        this.currentTemplate = template1;
    }

    public void switchTemplates(boolean primary) {
        this.currentTemplate = primary ? template1 : template2;
    }

    @Override
    public boolean isTransactional() {
        return this.currentTemplate.isTransactional();
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(String data) {
        return this.currentTemplate.sendDefault(data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(String key, String data) {
        return this.currentTemplate.sendDefault(key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(Integer partition, String key, String data) {
        return this.currentTemplate.sendDefault(partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> sendDefault(Integer partition, Long timestamp, String key,
            String data) {
        return this.currentTemplate.sendDefault(partition, timestamp, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
        return this.currentTemplate.send(topic, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, String key, String data) {
        return this.currentTemplate.send(topic, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, Integer partition, String key, String data) {
        return this.currentTemplate.send(topic, partition, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(String topic, Integer partition, Long timestamp,
            String key, String data) {
        return this.currentTemplate.send(topic, partition, timestamp, key, data);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(ProducerRecord<String, String> record) {
        return this.currentTemplate.send(record);
    }

    @Override
    public ListenableFuture<SendResult<String, String>> send(Message<?> message) {
        return this.currentTemplate.send(message);
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this.currentTemplate.partitionsFor(topic);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return this.currentTemplate.metrics();
    }

    @Override
    public <T> T execute(ProducerCallback<String, String, T> callback) {
        return this.currentTemplate.execute(callback);
    }

    @Override
    public <T> T executeInTransaction(OperationsCallback<String, String, T> callback) {
        return this.currentTemplate.executeInTransaction(callback);
    }

    @Override
    public String toString() {
        return this.currentTemplate.toString();
    }

    @Override
    public void flush() {
        this.currentTemplate.flush();
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.currentTemplate.sendOffsetsToTransaction(offsets);
    }

    @Override
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
        this.currentTemplate.sendOffsetsToTransaction(offsets, consumerGroupId);
    }

}
...