Мы используем PubSub в prod и видим проблему, заключающуюся в том, что существует больше виртуальных машин, обрабатывающих сообщения PubSub, которые мы ожидаем получить.
Я провел простые тесты с использованием PubSub в одночасье, и похоже, что не так гладко, как мы ожидали с механизмом ограничения скорости.
Вот тест:
- Publi sh некоторое количество сообщений в topi c с Pull Подписка. В эксперименте было около 2,7 тыс. Сообщений (началось примерно в 9 часов вечера)
- Настройка одного асинхронного c клиента с использованием соединения StreamingPull и FlowControl, установленного на 2.
- Имитация этой обработки каждое входящее сообщение занимает 5 секунд, перемещая выполнение в таймер и подтверждая сообщение только после его окончания.
Ожидаемые результаты: Сообщения от PubSub потребляются с одинаковой скоростью, получая 2 сообщения с время каждые 5 секунд. Ожидается небольшой тайм-аут между получением сообщения и новым сообщением, полученным из-за всех затрат сети и обработки.
Фактический результат: PubSub начинает регулировать или что-то в этом роде с огромным таймаутом. В это время сообщение не приходит. Время ожидания зависит от количества непрочитанных сообщений в подписке.
Из документов FlowControl .
* это не ясно. 1026 *
Вот код потребителя (клиента):
var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
var flowSettings = FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(concurrentFlowsNumber)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build();
var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setCredentialsProvider(() -> serviceAccountCredentials)
.setFlowControlSettings(flowSettings)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(ApiService.State from, Throwable failure) {
logger.error(failure);
}
},
MoreExecutors.directExecutor());
var apiService = subscriber.startAsync();
apiService.addListener(new ApiService.Listener() {
@Override
public void running() {
logger.info("Pubsub started");
}
@Override
public void failed(ApiService.State from, Throwable failure) {
logger.error("Pubsub failed on step: {}", from);
}
}, Runnable::run);
И обработчик сообщений:
private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
consumer.ack();
}
}, (long) 3000 + rand.nextInt(5000));
}
Итак, кто-нибудь знает, как сделать клиенты (многие vms) потребляют сообщения с одновременными ограничениями обработки (до 4 одновременных сообщений) без перерыва на время ожидания?
Ps Эти вопросы похожи, но не одинаковы: Управление потоком данных в pubsub Google pubsub Dynami c ограничение скорости Облако pubsub медленная скорость опроса