GRP C Java load-balancing - метод запуска не вызывается из NameResolver - PullRequest
0 голосов
/ 16 июня 2020

Мы пытаемся реализовать балансировку нагрузки gRP C в Java с помощью Consul Service Discovery. Информация о версии: grp c - java v1.30.0

Проблема в том, что при запуске приложения метод запуска из нашего настраиваемого класса NameResolver не вызывается!

Вот наш код:

Вот настраиваемый класс NameResolver (метод запуска здесь не вызывается)

Я поставил точку останова в методе запуска для проверки, но это не так. вызывается!

package com.bht.saigonparking.common.loadbalance;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.apache.logging.log4j.Level;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;

import com.bht.saigonparking.common.util.LoggingUtil;

import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import lombok.Getter;

/**
 *
 * @author bht
 */
@Getter
public final class SaigonParkingNameResolver extends NameResolver {

    private final URI consulURI;
    private final String serviceId;
    private final DiscoveryClient discoveryClient;

    private Listener listener;
    private List<ServiceInstance> serviceInstances;

    public SaigonParkingNameResolver(DiscoveryClient discoveryClient,
                                     URI consulURI,
                                     String serviceId,
                                     int pauseInSeconds) {

        this.consulURI = consulURI;
        this.serviceId = serviceId;
        this.discoveryClient = discoveryClient;

        /* run connection check timer */
        ConnectionCheckTimer connectionCheckTimer = new ConnectionCheckTimer(this, pauseInSeconds);
        connectionCheckTimer.runTimer();
    }

    @Override
    public String getServiceAuthority() {
        return consulURI.getAuthority();
    }

    @Override
    public void start(Listener2 listener) {
        this.listener = listener;
        loadServiceInstances();
    }

    @Override
    public void shutdown() {
        // implement shutdown...
    }

    void loadServiceInstances() {

        List<EquivalentAddressGroup> addressList = new ArrayList<>();
        serviceInstances = discoveryClient.getInstances(serviceId);

        if (serviceInstances == null || serviceInstances.isEmpty()) {
            LoggingUtil.log(Level.WARN, "loadServiceInstances", "Warning",
                    String.format("no serviceInstances of %s", serviceId));
            return;
        }

        serviceInstances.forEach(serviceInstance -> {
            String host = serviceInstance.getHost();
            int port = serviceInstance.getPort();

            LoggingUtil.log(Level.INFO, "loadServiceInstances", serviceId, String.format("%s:%d", host, port));

            List<SocketAddress> socketAddressList = new ArrayList<>();
            socketAddressList.add(new InetSocketAddress(host, port));
            addressList.add(new EquivalentAddressGroup(socketAddressList));
        });

        if (!addressList.isEmpty()) {
            listener.onAddresses(addressList, Attributes.EMPTY);
        }
    }
}

Вот настраиваемый класс NameResolverProvider

package com.bht.saigonparking.common.loadbalance;

import java.net.URI;

import org.springframework.cloud.client.discovery.DiscoveryClient;

import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import lombok.AllArgsConstructor;

/**
 * @author bht
 */
@AllArgsConstructor
public final class SaigonParkingNameResolverProvider extends NameResolverProvider {

    private final String serviceId;
    private final DiscoveryClient discoveryClient;
    private final int pauseInSeconds;

    @Override
    protected boolean isAvailable() {
        return true;
    }

    @Override
    protected int priority() {
        return 5;
    }

    @Override
    public String getDefaultScheme() {
        return "consul";
    }

    @Override
    public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
        return new SaigonParkingNameResolver(discoveryClient, targetUri, serviceId, pauseInSeconds);
    }
}

Вот класс от клиента

package com.bht.saigonparking.service.auth.configuration;

import java.util.concurrent.TimeUnit;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import com.bht.saigonparking.api.grpc.user.UserServiceGrpc;
import com.bht.saigonparking.common.interceptor.SaigonParkingClientInterceptor;
import com.bht.saigonparking.common.loadbalance.SaigonParkingNameResolverProvider;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.AllArgsConstructor;

/**
 *
 * @author bht
 */
@Component
@AllArgsConstructor(onConstructor = @__(@Autowired))
public final class ChannelConfiguration {

    private final SaigonParkingClientInterceptor clientInterceptor;

    @Bean("userResolver")
    public SaigonParkingNameResolverProvider userServiceNameResolverProvider(@Value("${connection.user-service.id}") String serviceId,
                                                                             @Value("${connection.refresh-period-in-seconds}") int refreshPeriod,
                                                                             @Autowired DiscoveryClient discoveryClient) {
        return new SaigonParkingNameResolverProvider(serviceId, discoveryClient, refreshPeriod);
    }


    /**
     *
     * channel is the abstraction to connect to a service endpoint
     *
     * note for gRPC service stub:
     *      .newStub(channel)          --> nonblocking/asynchronous stub
     *      .newBlockingStub(channel)  --> blocking/synchronous stub
     */
    @Bean
    public ManagedChannel managedChannel(@Value("${spring.cloud.consul.host}") String host,
                                         @Value("${spring.cloud.consul.port}") int port,
                                         @Value("${connection.idle-timeout}") int timeout,
                                         @Value("${connection.max-inbound-message-size}") int maxInBoundMessageSize,
                                         @Value("${connection.max-inbound-metadata-size}") int maxInBoundMetadataSize,
                                         @Value("${connection.load-balancing-policy}") String loadBalancingPolicy,
                                         @Qualifier("userResolver") SaigonParkingNameResolverProvider nameResolverProvider) {
        return ManagedChannelBuilder
                .forTarget("consul://" + host + ":" + port)                     // build channel to server with server's address
                .keepAliveWithoutCalls(false)                                   // Close channel when client has already received response
                .idleTimeout(timeout, TimeUnit.MILLISECONDS)                    // 10000 milliseconds / 1000 = 10 seconds --> request time-out
                .maxInboundMetadataSize(maxInBoundMetadataSize * 1024 * 1024)   // 2KB * 1024 = 2MB --> max message header size
                .maxInboundMessageSize(maxInBoundMessageSize * 1024 * 1024)     // 10KB * 1024 = 10MB --> max message size to transfer together
                .defaultLoadBalancingPolicy(loadBalancingPolicy)                // set load balancing policy for channel
                .nameResolverFactory(nameResolverProvider)                      // using Consul service discovery for DNS querying
                .intercept(clientInterceptor)                                   // add internal credential authentication
                .usePlaintext()                                                 // use plain-text to communicate internally
                .build();                                                       // Build channel to communicate over gRPC
    }


    /* asynchronous user service stub */
    @Bean
    public UserServiceGrpc.UserServiceStub userServiceStub(@Autowired ManagedChannel channel) {
        return UserServiceGrpc.newStub(channel);
    }


    /* synchronous user service stub */
    @Bean
    public UserServiceGrpc.UserServiceBlockingStub userServiceBlockingStub(@Autowired ManagedChannel channel) {
        return UserServiceGrpc.newBlockingStub(channel);
    }
}

Что-то не так с нашим кодом? Мы с нетерпением ждем вашего ответа!

1 Ответ

0 голосов
/ 16 июня 2020

Мы думали, что start будет называться созданным каналом. Это неправильно!

Извините, поскольку мы неправильно поняли насчет gRP C балансировки нагрузки.

Теперь это называется запуском при новом вызове службы!

Спасибо! Команда Saigon Parking.

...