Мы пытаемся реализовать балансировку нагрузки gRP C в Java с помощью Consul Service Discovery. Информация о версии: grp c - java v1.30.0
Проблема в том, что при запуске приложения метод запуска из нашего настраиваемого класса NameResolver не вызывается!
Вот наш код:
Вот настраиваемый класс NameResolver (метод запуска здесь не вызывается)
Я поставил точку останова в методе запуска для проверки, но это не так. вызывается!
package com.bht.saigonparking.common.loadbalance;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.Level;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import com.bht.saigonparking.common.util.LoggingUtil;
import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import lombok.Getter;
/**
*
* @author bht
*/
@Getter
public final class SaigonParkingNameResolver extends NameResolver {
private final URI consulURI;
private final String serviceId;
private final DiscoveryClient discoveryClient;
private Listener listener;
private List<ServiceInstance> serviceInstances;
public SaigonParkingNameResolver(DiscoveryClient discoveryClient,
URI consulURI,
String serviceId,
int pauseInSeconds) {
this.consulURI = consulURI;
this.serviceId = serviceId;
this.discoveryClient = discoveryClient;
/* run connection check timer */
ConnectionCheckTimer connectionCheckTimer = new ConnectionCheckTimer(this, pauseInSeconds);
connectionCheckTimer.runTimer();
}
@Override
public String getServiceAuthority() {
return consulURI.getAuthority();
}
@Override
public void start(Listener2 listener) {
this.listener = listener;
loadServiceInstances();
}
@Override
public void shutdown() {
// implement shutdown...
}
void loadServiceInstances() {
List<EquivalentAddressGroup> addressList = new ArrayList<>();
serviceInstances = discoveryClient.getInstances(serviceId);
if (serviceInstances == null || serviceInstances.isEmpty()) {
LoggingUtil.log(Level.WARN, "loadServiceInstances", "Warning",
String.format("no serviceInstances of %s", serviceId));
return;
}
serviceInstances.forEach(serviceInstance -> {
String host = serviceInstance.getHost();
int port = serviceInstance.getPort();
LoggingUtil.log(Level.INFO, "loadServiceInstances", serviceId, String.format("%s:%d", host, port));
List<SocketAddress> socketAddressList = new ArrayList<>();
socketAddressList.add(new InetSocketAddress(host, port));
addressList.add(new EquivalentAddressGroup(socketAddressList));
});
if (!addressList.isEmpty()) {
listener.onAddresses(addressList, Attributes.EMPTY);
}
}
}
Вот настраиваемый класс NameResolverProvider
package com.bht.saigonparking.common.loadbalance;
import java.net.URI;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import lombok.AllArgsConstructor;
/**
* @author bht
*/
@AllArgsConstructor
public final class SaigonParkingNameResolverProvider extends NameResolverProvider {
private final String serviceId;
private final DiscoveryClient discoveryClient;
private final int pauseInSeconds;
@Override
protected boolean isAvailable() {
return true;
}
@Override
protected int priority() {
return 5;
}
@Override
public String getDefaultScheme() {
return "consul";
}
@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
return new SaigonParkingNameResolver(discoveryClient, targetUri, serviceId, pauseInSeconds);
}
}
Вот класс от клиента
package com.bht.saigonparking.service.auth.configuration;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import com.bht.saigonparking.api.grpc.user.UserServiceGrpc;
import com.bht.saigonparking.common.interceptor.SaigonParkingClientInterceptor;
import com.bht.saigonparking.common.loadbalance.SaigonParkingNameResolverProvider;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import lombok.AllArgsConstructor;
/**
*
* @author bht
*/
@Component
@AllArgsConstructor(onConstructor = @__(@Autowired))
public final class ChannelConfiguration {
private final SaigonParkingClientInterceptor clientInterceptor;
@Bean("userResolver")
public SaigonParkingNameResolverProvider userServiceNameResolverProvider(@Value("${connection.user-service.id}") String serviceId,
@Value("${connection.refresh-period-in-seconds}") int refreshPeriod,
@Autowired DiscoveryClient discoveryClient) {
return new SaigonParkingNameResolverProvider(serviceId, discoveryClient, refreshPeriod);
}
/**
*
* channel is the abstraction to connect to a service endpoint
*
* note for gRPC service stub:
* .newStub(channel) --> nonblocking/asynchronous stub
* .newBlockingStub(channel) --> blocking/synchronous stub
*/
@Bean
public ManagedChannel managedChannel(@Value("${spring.cloud.consul.host}") String host,
@Value("${spring.cloud.consul.port}") int port,
@Value("${connection.idle-timeout}") int timeout,
@Value("${connection.max-inbound-message-size}") int maxInBoundMessageSize,
@Value("${connection.max-inbound-metadata-size}") int maxInBoundMetadataSize,
@Value("${connection.load-balancing-policy}") String loadBalancingPolicy,
@Qualifier("userResolver") SaigonParkingNameResolverProvider nameResolverProvider) {
return ManagedChannelBuilder
.forTarget("consul://" + host + ":" + port) // build channel to server with server's address
.keepAliveWithoutCalls(false) // Close channel when client has already received response
.idleTimeout(timeout, TimeUnit.MILLISECONDS) // 10000 milliseconds / 1000 = 10 seconds --> request time-out
.maxInboundMetadataSize(maxInBoundMetadataSize * 1024 * 1024) // 2KB * 1024 = 2MB --> max message header size
.maxInboundMessageSize(maxInBoundMessageSize * 1024 * 1024) // 10KB * 1024 = 10MB --> max message size to transfer together
.defaultLoadBalancingPolicy(loadBalancingPolicy) // set load balancing policy for channel
.nameResolverFactory(nameResolverProvider) // using Consul service discovery for DNS querying
.intercept(clientInterceptor) // add internal credential authentication
.usePlaintext() // use plain-text to communicate internally
.build(); // Build channel to communicate over gRPC
}
/* asynchronous user service stub */
@Bean
public UserServiceGrpc.UserServiceStub userServiceStub(@Autowired ManagedChannel channel) {
return UserServiceGrpc.newStub(channel);
}
/* synchronous user service stub */
@Bean
public UserServiceGrpc.UserServiceBlockingStub userServiceBlockingStub(@Autowired ManagedChannel channel) {
return UserServiceGrpc.newBlockingStub(channel);
}
}
Что-то не так с нашим кодом? Мы с нетерпением ждем вашего ответа!