Просто реализуйте свой собственный KafkaOperations
, который делегирует активному шаблону и переключается при необходимости.
РЕДАКТИРОВАТЬ
Например:
public class DelegatingTemplate<K, V> implements KafkaOperations<K, V> {
private final KafkaTemplate<String, String> template1;
private final KafkaTemplate<String, String> template2;
private volatile KafkaTemplate<String, String> currentTemplate;
public DelegatingTemplate(KafkaTemplate<String, String> template1, KafkaTemplate<String, String> template2) {
this.template1 = template1;
this.template2 = template2;
this.currentTemplate = template1;
}
public void switchTemplates(boolean primary) {
this.currentTemplate = primary ? template1 : template2;
}
@Override
public boolean isTransactional() {
return this.currentTemplate.isTransactional();
}
@Override
public ListenableFuture<SendResult<String, String>> sendDefault(String data) {
return this.currentTemplate.sendDefault(data);
}
@Override
public ListenableFuture<SendResult<String, String>> sendDefault(String key, String data) {
return this.currentTemplate.sendDefault(key, data);
}
@Override
public ListenableFuture<SendResult<String, String>> sendDefault(Integer partition, String key, String data) {
return this.currentTemplate.sendDefault(partition, key, data);
}
@Override
public ListenableFuture<SendResult<String, String>> sendDefault(Integer partition, Long timestamp, String key,
String data) {
return this.currentTemplate.sendDefault(partition, timestamp, key, data);
}
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
return this.currentTemplate.send(topic, data);
}
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, String key, String data) {
return this.currentTemplate.send(topic, key, data);
}
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, Integer partition, String key, String data) {
return this.currentTemplate.send(topic, partition, key, data);
}
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, Integer partition, Long timestamp,
String key, String data) {
return this.currentTemplate.send(topic, partition, timestamp, key, data);
}
@Override
public ListenableFuture<SendResult<String, String>> send(ProducerRecord<String, String> record) {
return this.currentTemplate.send(record);
}
@Override
public ListenableFuture<SendResult<String, String>> send(Message<?> message) {
return this.currentTemplate.send(message);
}
@Override
public List<PartitionInfo> partitionsFor(String topic) {
return this.currentTemplate.partitionsFor(topic);
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return this.currentTemplate.metrics();
}
@Override
public <T> T execute(ProducerCallback<String, String, T> callback) {
return this.currentTemplate.execute(callback);
}
@Override
public <T> T executeInTransaction(OperationsCallback<String, String, T> callback) {
return this.currentTemplate.executeInTransaction(callback);
}
@Override
public String toString() {
return this.currentTemplate.toString();
}
@Override
public void flush() {
this.currentTemplate.flush();
}
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
this.currentTemplate.sendOffsetsToTransaction(offsets);
}
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
this.currentTemplate.sendOffsetsToTransaction(offsets, consumerGroupId);
}
}