ActiveMQ Отказоустойчивый транспорт через Spring Stomp Broker Relay - PullRequest
0 голосов
/ 22 ноября 2018

Удалось ли кому-либо использовать отказоустойчивый транспорт с использованием активного / резервного Active MQ (если точнее Amazon MQ) при настройке ретранслятора Stomp Broker Websocket?

Я создал следующий TcpClient, но он реализует алгоритм циклического перебора по заданномуконечные точки, которые вызывают некоторые ошибки при выборе фактически резервного экземпляра:

Ошибка транспорта: сбой epoll_ctl (..): Нет такого файла или каталога

Вот мой текущийреализация:

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
    private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
    private final List<URI> endpoints;
    private final boolean isSsl;

    public StompTcpFactory(List<String> endpoints) {
        this.endpoints = endpoints.stream()
                .map(e -> contains(e, "://") ? e : "fake://" + e)
                .map(URI::create)
                .collect(toList());
        isSsl = this.endpoints.stream().anyMatch(StompTcpFactory::isSsl);

        boolean anyNotSsl = this.endpoints.stream().anyMatch(not(StompTcpFactory::isSsl));
        if (isSsl && anyNotSsl)
            throw new IllegalArgumentException("Cannot configure STOMP to use SSL and regular connections at the same time: " + endpoints);
    }

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
        return tcpClientSpec
                .env(environment)
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .ssl(isSsl ? new SslOptions() : null)
                .connect(new InetSocketAddressSupplier(endpoints));
    }

    private static boolean isSsl(URI endpoint) {
        return containsIgnoreCase(endpoint.getScheme(), "ssl");
    }

    private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
        @Override
        public ReactorConfiguration read() {
            return new ReactorConfiguration(emptyList(), "sync", new Properties());
        }
    }

}

со следующим поставщиком:

public class InetSocketAddressSupplier implements Supplier<InetSocketAddress> {

    private static final AtomicInteger counter = new AtomicInteger(0);

    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final List<URI> endpoints;

    public InetSocketAddressSupplier(List<URI> endpoints) {
        this.endpoints = endpoints;
    }

    @Override
    public InetSocketAddress get() {
        int endpointIndex = counter.getAndUpdate(i -> ++i % endpoints.size());
        URI endpoint = endpoints.get(endpointIndex);

        logger.info("\n\nConnecting to broker[{}]: {}:{}\n\n", endpointIndex, endpoint.getHost(), endpoint.getPort());
        return new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
    }

}

А вот конфигурация брокера:

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    String environment = configService.getString(ENVIRONMENT);

    String user = configService.getString(WEBSOCKET_EXTERNAL_BROKER_USER);
    String password = configService.getString(WEBSOCKET_EXTERNAL_BROKER_PASSWORD);
    registry
            .enableStompBrokerRelay("/queue", "/topic")
            .setRelayHost(UNUSED_RELAY_HOST)
            .setClientLogin(user)
            .setClientPasscode(password)
            .setSystemLogin(user)
            .setSystemPasscode(password)
            .setUserDestinationBroadcast(format("/topic/%s-unresolved-user-destination", environment))
            .setUserRegistryBroadcast(format("/topic/%s-simp-user-registry", environment))
            .setTcpClient(createTcpClient());

    registry.setApplicationDestinationPrefixes(format("/%s-websocket-app", environment));
}


private TcpOperations<byte[]> createTcpClient() {
    List<String> endpoints = asList(configService.getStringArray(WEBSOCKET_EXTERNAL_BROKER_ENDPOINTS));
    logger.info("Configuring websocket brokers to: {}", endpoints);
    return new Reactor2TcpClient<>(new StompTcpFactory(endpoints));
}
...