Вопросы по TCP-серверу в Vert.x - PullRequest
0 голосов
/ 20 марта 2020

Моя система - это высокоскоростная потоковая система в том же узле и той же JVM, я хочу использовать NetServer для получения сообщений и использовать шину событий для пересылки сообщений в другой Verticle в том же Vertx.

Как мы знаем, в Vert.x NetServer вызывается событием l oop thread. Я столкнулся с проблемой. Предположим, что потребитель шины событий работает медленно, а NetServer слишком быстр для отправки данных на шину событий. Если буфер потребителя заполнен, шина событий отбрасывает и теряет данные. Поэтому я думаю, есть ли какой-либо способ управления скоростью потока в NetServer? Или шина событий оказывает обратное давление на NetServer, чтобы замедлить отправку на шину событий.

Я ценю ваши идеи и комментарии.

1 Ответ

0 голосов
/ 21 марта 2020

Из того, что я видел, Vert.x EventBus не отбрасывает сообщения для локальных потребителей.

Хотя в документации говорится, что DEFAULT_ACCEPT_BACKLOG установлено на 1024, на самом деле это -1:

https://github.com/eclipse-vertx/vert.x/blob/1ab9884cb29adea75b378c3fa7359813f3de68f9/src/main/java/io/vertx/core/net/NetServerOptions.java#L45

И я не вижу, чтобы он использовался где-либо для ограничения размера EventBus.

Вы также можете использовать этот тест, чтобы убедиться, что все сообщения фактически приходят:

public class BackpressureExample {

    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx(new VertxOptions().setEventBusOptions(new EventBusOptions().setAcceptBacklog(1)));

        vertx.deployVerticle(new SlowConsumer(), new DeploymentOptions().setWorker(true), h -> {
            for (int i = 0; i < 10_000; i++) {
                vertx.eventBus().send("address", UUID.randomUUID().toString());
            }
            System.out.println("Done producing");
        });
    }

    private static class SlowConsumer extends AbstractVerticle {

        @Override
        public void start() {
            AtomicInteger counter = new AtomicInteger();
            this.vertx.eventBus().consumer("address", h -> {
                System.out.println(counter.incrementAndGet());
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

Вы можете посмотреть, как сообщения доставляются местным потребителям здесь: https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java#L359

...