Невозможно получить запись от ServiceDiscovery в кластере vertx - PullRequest
0 голосов
/ 04 марта 2019

У меня есть кластер vert.x на моей локальной машине.Существует две вершины

public class AccountVerticle extends AbstractVerticle {

    private ServiceDiscovery discovery;

    private static final Logger logger = LoggerFactory.getLogger(AccountVerticle.class);

    @Override
    public void start(Future<Void> fut) {
        Vertx vertx = Vertx.vertx();
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.get("/api/account").handler(this::get);

        discovery = ServiceDiscovery.create(vertx);
        createHttpServer(router).subscribe((server, err) -> {
            if (err != null) {
                logger.error("Error while starting server", err);
            } else {
                logger.info("Server has been successfully started");
                discovery.rxPublish(HttpEndpoint.createRecord("accounts", "localhost", 8080, "/api"))
                        .subscribe((rec,error) -> {
                            if (error != null) {
                                logger.error("Error while starting record", err);
                            } else {
                                logger.info("Record has been successfully published");
                                discovery.rxGetRecords(record -> true)
                                        .subscribe((records, throwable) -> {
                                            logger.info(records.get(0).getLocation());
                                        });
                            }
                        });
            }
        });
    }

    private Single<HttpServer> createHttpServer(Router router) {
        return vertx
                .createHttpServer()
                .requestHandler(router)
                .rxListen(8080);
    }

    private void get(RoutingContext rc) {
        Single.just(new JsonObject("{\"account\":\"test\"}")).subscribe(ActionHelper.ok(rc));
    }
}


public class CustomerVerticle extends AbstractVerticle {

    private ServiceDiscovery discovery;

    private static final Logger logger = LoggerFactory.getLogger(CustomerVerticle.class);

    @Override
    public void start(Future<Void> fut) {
        Vertx vertx = Vertx.vertx();
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.post("/api/customers/:id/accounts").handler(this::updateCustomerAccount);

        discovery = ServiceDiscovery.create(vertx);
        createHttpServer(router).subscribe((server, err) -> {
            if (err != null) {
                logger.error("Error while starting server", err);
            } else {
                logger.info("Server has been successfully started");
                discovery.rxGetRecords(record -> true)
                            .subscribe((records, throwable) -> {
                                logger.info(records);
                            });
            }
        });
    }

    private Single<HttpServer> createHttpServer(Router router) {
        return vertx
                .createHttpServer()
                .requestHandler(router)
                .rxListen(8090);
    }

    private void updateCustomerAccount(RoutingContext rc) {

        HttpEndpoint.rxGetWebClient(discovery, record -> {
                        return record.getName().equals("accounts");
                    })
                    .flatMap(httpClient -> {
                        return httpClient.get("/api")
                                    .as(BodyCodec.string())
                                    .rxSend();
                    }).subscribe((response, err) -> {
                        logger.info(response);
                        Single.just(new JsonObject("{\"customer\":\"test\"}")).subscribe(ActionHelper.ok(rc));
                    });
    }
}

AccountVerticle публикует в ServiceDiscovery запись HttpEndpoint.Я вижу эту запись discovery.rxGetRecords в AccountVerticle.Но когда я пытаюсь получить записи внутри CustomerVerticle, я ничего не получаю

Журналы AccountVerticle:

Connected to the target VM, address: '127.0.0.1:37153', transport: 'socket'
Mar 03, 2019 11:57:02 PM io.vertx.core.impl.launcher.commands.RunCommand
INFO: Starting clustering...
Mar 03, 2019 11:57:02 PM io.vertx.core.impl.launcher.commands.RunCommand
INFO: No cluster-host specified so using address 172.18.0.1
Mar 03, 2019 11:57:03 PM com.hazelcast.instance.AddressPicker
INFO: [LOCAL] [dev] [3.10.5] Prefer IPv4 stack is true.
Mar 03, 2019 11:57:03 PM com.hazelcast.instance.AddressPicker
INFO: [LOCAL] [dev] [3.10.5] Picked [192.168.0.105]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
Mar 03, 2019 11:57:03 PM com.hazelcast.system
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Hazelcast 3.10.5 (20180913 - 6ffa2ee) starting at [192.168.0.105]:5701
Mar 03, 2019 11:57:03 PM com.hazelcast.system
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
Mar 03, 2019 11:57:03 PM com.hazelcast.system
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Configured Hazelcast Serialization version: 1
Mar 03, 2019 11:57:03 PM com.hazelcast.instance.Node
INFO: [192.168.0.105]:5701 [dev] [3.10.5] A non-empty group password is configured for the Hazelcast member. Starting with Hazelcast version 3.8.2, members with the same group name, but with different group passwords (that do not use authentication) form a cluster. The group password configuration will be removed completely in a future release.
Mar 03, 2019 11:57:04 PM com.hazelcast.spi.impl.operationservice.impl.BackpressureRegulator
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Backpressure is disabled
Mar 03, 2019 11:57:04 PM com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Running with 2 response threads
Mar 03, 2019 11:57:06 PM com.hazelcast.instance.Node
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Creating MulticastJoiner
Mar 03, 2019 11:57:06 PM com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Starting 4 partition threads and 3 generic threads (1 dedicated for priority tasks)
Mar 03, 2019 11:57:06 PM com.hazelcast.internal.diagnostics.Diagnostics
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
Mar 03, 2019 11:57:06 PM com.hazelcast.core.LifecycleService
INFO: [192.168.0.105]:5701 [dev] [3.10.5] [192.168.0.105]:5701 is STARTING
Mar 03, 2019 11:57:09 PM com.hazelcast.system
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Cluster version set to 3.10
Mar 03, 2019 11:57:09 PM com.hazelcast.internal.cluster.ClusterService
INFO: [192.168.0.105]:5701 [dev] [3.10.5] 

Members {size:1, ver:1} [
    Member [192.168.0.105]:5701 - 2a3f5096-5a47-4ade-9e0c-ce1d1d3b4e0f this
]

Mar 03, 2019 11:57:09 PM com.hazelcast.core.LifecycleService
INFO: [192.168.0.105]:5701 [dev] [3.10.5] [192.168.0.105]:5701 is STARTED
Mar 03, 2019 11:57:10 PM com.hazelcast.internal.partition.impl.PartitionStateManager
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Initializing cluster partition table arrangement...
Mar 03, 2019 11:57:10 PM io.vertx.core.impl.VertxImpl
WARNING: You're already on a Vert.x context, are you sure you want to create a new Vertx instance?
Mar 03, 2019 11:57:10 PM ua.home.accounts.AccountVerticle
INFO: Server has been successfully started
Mar 03, 2019 11:57:10 PM ua.home.accounts.AccountVerticle
INFO: Record has been successfully published
Mar 03, 2019 11:57:10 PM ua.home.accounts.AccountVerticle
INFO: {"endpoint":"http://localhost:8080/api","host":"localhost","port":8080,"root":"/api","ssl":false}

Журналы CustomerVerticle

Connected to the target VM, address: '127.0.0.1:36463', transport: 'socket'
Mar 04, 2019 12:04:41 AM io.vertx.core.impl.launcher.commands.RunCommand
INFO: Starting clustering...
Mar 04, 2019 12:04:41 AM io.vertx.core.impl.launcher.commands.RunCommand
INFO: No cluster-host specified so using address 172.18.0.1
Mar 04, 2019 12:04:43 AM com.hazelcast.instance.AddressPicker
INFO: [LOCAL] [dev] [3.10.5] Prefer IPv4 stack is true.
Mar 04, 2019 12:04:43 AM com.hazelcast.instance.AddressPicker
INFO: [LOCAL] [dev] [3.10.5] Picked [192.168.0.105]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true
Mar 04, 2019 12:04:43 AM com.hazelcast.system
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Hazelcast 3.10.5 (20180913 - 6ffa2ee) starting at [192.168.0.105]:5702
Mar 04, 2019 12:04:43 AM com.hazelcast.system
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
Mar 04, 2019 12:04:43 AM com.hazelcast.system
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Configured Hazelcast Serialization version: 1
Mar 04, 2019 12:04:43 AM com.hazelcast.instance.Node
INFO: [192.168.0.105]:5702 [dev] [3.10.5] A non-empty group password is configured for the Hazelcast member. Starting with Hazelcast version 3.8.2, members with the same group name, but with different group passwords (that do not use authentication) form a cluster. The group password configuration will be removed completely in a future release.
Mar 04, 2019 12:04:43 AM com.hazelcast.spi.impl.operationservice.impl.BackpressureRegulator
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Backpressure is disabled
Mar 04, 2019 12:04:43 AM com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Running with 2 response threads
Mar 04, 2019 12:04:44 AM com.hazelcast.instance.Node
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Creating MulticastJoiner
Mar 04, 2019 12:04:45 AM com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Starting 4 partition threads and 3 generic threads (1 dedicated for priority tasks)
Mar 04, 2019 12:04:45 AM com.hazelcast.internal.diagnostics.Diagnostics
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
Mar 04, 2019 12:04:45 AM com.hazelcast.core.LifecycleService
INFO: [192.168.0.105]:5702 [dev] [3.10.5] [192.168.0.105]:5702 is STARTING
Mar 04, 2019 12:04:45 AM com.hazelcast.internal.cluster.impl.MulticastJoiner
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Trying to join to discovered node: [192.168.0.105]:5701
Mar 04, 2019 12:04:45 AM com.hazelcast.nio.tcp.TcpIpConnector
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Connecting to /192.168.0.105:5701, timeout: 0, bind-any: true
Mar 04, 2019 12:04:45 AM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Established socket connection between /192.168.0.105:58085 and /192.168.0.105:5701
Mar 04, 2019 12:04:51 AM com.hazelcast.system
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Cluster version set to 3.10
Mar 04, 2019 12:04:51 AM com.hazelcast.internal.cluster.ClusterService
INFO: [192.168.0.105]:5702 [dev] [3.10.5] 

Members {size:2, ver:2} [
    Member [192.168.0.105]:5701 - 2a3f5096-5a47-4ade-9e0c-ce1d1d3b4e0f
    Member [192.168.0.105]:5702 - 8f030de2-dc19-4f02-96db-abcb53114bad this
]

Mar 04, 2019 12:04:52 AM com.hazelcast.core.LifecycleService
INFO: [192.168.0.105]:5702 [dev] [3.10.5] [192.168.0.105]:5702 is STARTED
Mar 04, 2019 12:04:53 AM io.vertx.core.impl.VertxImpl
WARNING: You're already on a Vert.x context, are you sure you want to create a new Vertx instance?
Mar 04, 2019 12:04:53 AM ua.home.customers.CustomerVerticle
INFO: Server has been successfully started
Mar 04, 2019 12:04:53 AM ua.home.customers.CustomerVerticle
INFO: []

Это Java-проект https://github.com/b3lowster/samples_for_blog/tree/master/rx_sd_java_vertx

ОБНОВЛЕНИЕ: Могу ли я опубликовать запись в обнаружении службы и использовать эту запись в другой статье в кластере Hazelcast?

1 Ответ

0 голосов
/ 22 марта 2019

Я нашел подсказку. Недостаточно просто запустить класс Verticle с аргументом -cluster.Необходимо инициализировать HazelcastClusterManager

Обратите внимание на мой пример ниже

@Override
public void start(Future<Void> future) throws Exception {
    Config hazelcastConfig = ConfigUtil.loadConfig();
    hazelcastConfig.getGroupConfig()
            .setName("tsv-cluster");
    ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
    VertxOptions options = new VertxOptions().setClusterManager(mgr);
    Vertx.rxClusteredVertx(options).subscribe(vertx -> {
        // TODO
    });
}
...