Google PubSub asyn c ограничение скорости не работает должным образом - PullRequest
0 голосов
/ 28 марта 2020

Мы используем PubSub в prod и видим проблему, заключающуюся в том, что существует больше виртуальных машин, обрабатывающих сообщения PubSub, которые мы ожидаем получить.

Я провел простые тесты с использованием PubSub в одночасье, и похоже, что не так гладко, как мы ожидали с механизмом ограничения скорости.

Вот тест:

  1. Publi sh некоторое количество сообщений в topi c с Pull Подписка. В эксперименте было около 2,7 тыс. Сообщений (началось примерно в 9 часов вечера)
  2. Настройка одного асинхронного c клиента с использованием соединения StreamingPull и FlowControl, установленного на 2.
  3. Имитация этой обработки каждое входящее сообщение занимает 5 секунд, перемещая выполнение в таймер и подтверждая сообщение только после его окончания.

Ожидаемые результаты: Сообщения от PubSub потребляются с одинаковой скоростью, получая 2 сообщения с время каждые 5 секунд. Ожидается небольшой тайм-аут между получением сообщения и новым сообщением, полученным из-за всех затрат сети и обработки.

Фактический результат: PubSub начинает регулировать или что-то в этом роде с огромным таймаутом. В это время сообщение не приходит. Время ожидания зависит от количества непрочитанных сообщений в подписке.

Из документов FlowControl .

PubSub subscription unacked message count* это не ясно. 1026 *

Вот код потребителя (клиента):

var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
    var flowSettings = FlowControlSettings.newBuilder()
      .setMaxOutstandingElementCount(concurrentFlowsNumber)
      .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
      .build();

    var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
      .setCredentialsProvider(() -> serviceAccountCredentials)
      .setFlowControlSettings(flowSettings)
      .build();

    subscriber.addListener(
      new Subscriber.Listener() {
        @Override
        public void failed(ApiService.State from, Throwable failure) {
          logger.error(failure);
        }
      },
      MoreExecutors.directExecutor());

    var apiService = subscriber.startAsync();
    apiService.addListener(new ApiService.Listener() {
      @Override
      public void running() {
         logger.info("Pubsub started");
      }

      @Override
      public void failed(ApiService.State from, Throwable failure) {
        logger.error("Pubsub failed on step: {}", from);
      }
    }, Runnable::run);

И обработчик сообщений:

private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
               consumer.ack();
      }
    }, (long) 3000 + rand.nextInt(5000));
  }

Итак, кто-нибудь знает, как сделать клиенты (многие vms) потребляют сообщения с одновременными ограничениями обработки (до 4 одновременных сообщений) без перерыва на время ожидания?

Ps Эти вопросы похожи, но не одинаковы: Управление потоком данных в pubsub Google pubsub Dynami c ограничение скорости Облако pubsub медленная скорость опроса

1 Ответ

0 голосов
/ 06 апреля 2020

Поскольку у вас есть накопившийся журнал невыполненных работ, вы можете столкнуться с этой проблемой: https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages

Ваши недоставленные сообщения будут помещаться в буфер между службой Pub / Sub и клиентской библиотекой , Сообщения могут застрять в буфере одного клиента или быть доставлены тому же клиенту, если ackDeadline был превышен.

Вы можете поэкспериментировать с использованием синхронного режима, как предложено.

...