надежный производитель асин c пакетной кафки с использованием весеннего облачного потока - PullRequest
1 голос
/ 20 марта 2020

Я хочу настроить некоторые преобразования (потреблять из одной топи c, производить в другую), используя весенний поток облака. Кроме того, я хочу, чтобы он был

  • надежным - скажем «по крайней мере, один раз»
  • «Быстрая продукция» - выполняйте производителем.flu sh () один раз за пакет, а не за сообщение
  • быстрое смещение фиксации - выполнять фиксацию один раз для пакета из раздела

с помощью необработанного клиента kafka или с использованием spring-kafka. Я сделал бы следующее (при условии, что производитель настроен соответствующим образом: acks = all, linger.ms> 0, размер пакета достаточно велик и т. д.):

  1. получать использованные сообщения (скажем, из consumer.poll ())
  2. сделать свое собственное преобразование для каждое сообщение
  3. provider.send (сообщение) .addCallback (...) для каждого сообщения
  4. provider.flu sh ()
  5. проверка на отсутствие ошибок в обратном вызове send
  6. consumer.commit () - для максимального смещения

вопрос в том, как добиться подобного с spring-cloud-stream? мне известно о consumer.batch-mode = true и provider.sync = false, но я не уверен, как правильно связать надежную продукцию со смещенным коммитом

UPDATE вроде простое решение, которое я придумал (обратите внимание, что помимо начального требования мне нужна динамическая c маршрутизация с динамическим c топи c создание):

конфигурация

public class DynamicRouterCfg {

    @Autowired
    private BinderAwareChannelResolver outputResolver;

    @Autowired
    private DynamicRouterProducerListener dynamicRouterProducerListener;

    private final Lock lock = new ReentrantLock();

    @Bean
    public Consumer<Message<List<byte[]>>> dynamicRouter() {
        return msgs -> {
            lock.lock();
            try {
                dynamicRouterProducerListener.setBatchSize(msgs.getPayload().size());

                for (byte[] payload : msgs.getPayload()) {
                    doBusinessLogic(payload);
                    outputResolver.resolveDestination(resolveDestination(payload))
                            .send(MessageBuilder.withPayload(payload)
                            .build());
                }

                if (dynamicRouterProducerListener.waitForBatchSuccess()) {
                    Acknowledgment ack = (Acknowledgment) msgs.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT);
                    Objects.requireNonNull(ack).acknowledge();
                }

            } finally {
                lock.unlock();
            }
        };
    }

    private void doBusinessLogic(byte[] payload) {
        // placeholder for business transformation
    }

    private String resolveDestination(byte[] payload) {
        // some business logic for destination resolving
        return null;
    }

    private static class DynamicRouterProducerListener implements ProducerListener<Object, Object> {

        private volatile CountDownLatch batchLatch;

        public void setBatchSize(int batchSize) {
            this.batchLatch = new CountDownLatch(batchSize);
        }

        public boolean waitForBatchSuccess() {
            try {
                return batchLatch.await(10000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                return false;
            }
        }

        @Override
        public void onSuccess(ProducerRecord<Object, Object> producerRecord, RecordMetadata recordMetadata) {
            batchLatch.countDown();
        }

    }
}

application.yml

spring:
  cloud:
    function:
      definition: dynamicRouter
    stream:
      bindings:
        dynamicRouter-in-0:
          destination: consumed.topic
          group: test.group
          consumer:
            batch-mode: true
            concurrency: 1
            header-mode: none
            use-native-decoding: true
      kafka:
        binder:
          brokers: broker:9092
          auto-create-topics: true
          required-acks: all
        bindings:
          router-in-0:
            consumer:
              auto-rebalance-enabled: false
              auto-commit-offset: false
              auto-commit-on-error: false
              configuration.fetch.max.bytes: 5024000
              configuration.isolation.level: read_committed
        default:
          producer:
            sync: false
            batch-timeout: 10 # as i see this one would be converted to linger.ms
            compression: gzip
            configuration:
              max.in.flight.requests.per.connection: 1
              max.request.size: 5024000
            # i need to create topics with custom configuration
            topic.replication-factor: 2
            topic.properties:
              cleanup.policy: compact
              min.cleanable.dirty.ratio: 0.1

какие недостатки я вижу здесь:

  1. использование устаревшего BinderAwareChannelResolver: не знаю, как я могу использовать spring.cloud.stream.sendto.destination в моем случае
  2. после каждого пакета будет ожидаться избыточный $ {batch-timeout}
  3. не уверен насчет гарантий метода producer.onSuccess (если возможны "дублированные" успешные вызовы), особенно когда сообщения от одного входящего пакета являются перенаправлено на разные темы
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...