Я хочу настроить некоторые преобразования (потреблять из одной топи c, производить в другую), используя весенний поток облака. Кроме того, я хочу, чтобы он был
- надежным - скажем «по крайней мере, один раз»
- «Быстрая продукция» - выполняйте производителем.flu sh () один раз за пакет, а не за сообщение
- быстрое смещение фиксации - выполнять фиксацию один раз для пакета из раздела
с помощью необработанного клиента kafka или с использованием spring-kafka. Я сделал бы следующее (при условии, что производитель настроен соответствующим образом: acks = all, linger.ms> 0, размер пакета достаточно велик и т. д.):
- получать использованные сообщения (скажем, из consumer.poll ())
- сделать свое собственное преобразование для каждое сообщение
- provider.send (сообщение) .addCallback (...) для каждого сообщения
- provider.flu sh ()
- проверка на отсутствие ошибок в обратном вызове send
- consumer.commit () - для максимального смещения
вопрос в том, как добиться подобного с spring-cloud-stream? мне известно о consumer.batch-mode = true и provider.sync = false, но я не уверен, как правильно связать надежную продукцию со смещенным коммитом
UPDATE вроде простое решение, которое я придумал (обратите внимание, что помимо начального требования мне нужна динамическая c маршрутизация с динамическим c топи c создание):
конфигурация
public class DynamicRouterCfg {
@Autowired
private BinderAwareChannelResolver outputResolver;
@Autowired
private DynamicRouterProducerListener dynamicRouterProducerListener;
private final Lock lock = new ReentrantLock();
@Bean
public Consumer<Message<List<byte[]>>> dynamicRouter() {
return msgs -> {
lock.lock();
try {
dynamicRouterProducerListener.setBatchSize(msgs.getPayload().size());
for (byte[] payload : msgs.getPayload()) {
doBusinessLogic(payload);
outputResolver.resolveDestination(resolveDestination(payload))
.send(MessageBuilder.withPayload(payload)
.build());
}
if (dynamicRouterProducerListener.waitForBatchSuccess()) {
Acknowledgment ack = (Acknowledgment) msgs.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT);
Objects.requireNonNull(ack).acknowledge();
}
} finally {
lock.unlock();
}
};
}
private void doBusinessLogic(byte[] payload) {
// placeholder for business transformation
}
private String resolveDestination(byte[] payload) {
// some business logic for destination resolving
return null;
}
private static class DynamicRouterProducerListener implements ProducerListener<Object, Object> {
private volatile CountDownLatch batchLatch;
public void setBatchSize(int batchSize) {
this.batchLatch = new CountDownLatch(batchSize);
}
public boolean waitForBatchSuccess() {
try {
return batchLatch.await(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return false;
}
}
@Override
public void onSuccess(ProducerRecord<Object, Object> producerRecord, RecordMetadata recordMetadata) {
batchLatch.countDown();
}
}
}
application.yml
spring:
cloud:
function:
definition: dynamicRouter
stream:
bindings:
dynamicRouter-in-0:
destination: consumed.topic
group: test.group
consumer:
batch-mode: true
concurrency: 1
header-mode: none
use-native-decoding: true
kafka:
binder:
brokers: broker:9092
auto-create-topics: true
required-acks: all
bindings:
router-in-0:
consumer:
auto-rebalance-enabled: false
auto-commit-offset: false
auto-commit-on-error: false
configuration.fetch.max.bytes: 5024000
configuration.isolation.level: read_committed
default:
producer:
sync: false
batch-timeout: 10 # as i see this one would be converted to linger.ms
compression: gzip
configuration:
max.in.flight.requests.per.connection: 1
max.request.size: 5024000
# i need to create topics with custom configuration
topic.replication-factor: 2
topic.properties:
cleanup.policy: compact
min.cleanable.dirty.ratio: 0.1
какие недостатки я вижу здесь:
- использование устаревшего BinderAwareChannelResolver: не знаю, как я могу использовать
spring.cloud.stream.sendto.destination
в моем случае - после каждого пакета будет ожидаться избыточный $ {batch-timeout}
- не уверен насчет гарантий метода
producer.onSuccess
(если возможны "дублированные" успешные вызовы), особенно когда сообщения от одного входящего пакета являются перенаправлено на разные темы