Получение EventHubException: соединение прервано, когда я создаю ДВА экземпляра EventHubClient - PullRequest
0 голосов
/ 29 апреля 2018

Это также не является дубликатом ошибки «Соединение прервано» при попытке отправить события в Azure Event Hub с помощью API-интерфейса java EventHubClient , поскольку проблема в этом вопросе была связана с прокси-сервером. Это НЕ проблема с моими настройками прокси, так как этот код работает для клиента-концентратора Singe Event

Мой вопрос: возможно ли иметь Java-приложение, которое отправляет сообщения ДВЕ или более Клиентам-концентраторам событий?

Я пытаюсь опубликовать / отправить данные о событиях в несколько экземпляров концентраторов событий Azure. Я подготовил ДВА пространства имен концентратора событий, и у каждого пространства имен есть свой концентратор событий. Я имею во всех ДВУХ центрах событий, которые имеют свои собственные Строки Соединения, Ключи SAS и Пространства имен и Имена. Поскольку каждое пространство имен концентратора событий может поддерживать только 20 единиц пропускной способности без ручного вмешательства (запрос на обслуживание), я пытаюсь выяснить, могу ли я отправить свои данные в концентраторы с несколькими событиями. Я вижу, что мой код отлично работает для SINGLE EventHubClient . В тот момент, когда мой код пытается создать второй EventHubClient, я получаю исключение для этого соединения.

Я использую пример кода, предоставленный на git-концентраторе Azure по адресу https://github.com/MicrosoftDocs/azure-docs/blob/master/articles/event-hubs/event-hubs-java-get-started-send.md

Я вижу следующее исключение:

Exception in thread "main" com.microsoft.azure.eventhubs.EventHubException: connection aborted
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:59)
at com.microsoft.azure.eventhubs.impl.MessagingFactory.onConnectionError(MessagingFactory.java:249)
at com.microsoft.azure.eventhubs.impl.ConnectionHandler.onTransportError(ConnectionHandler.java:102)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:445)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Любые указатели / входы будут высоко оценены. Вот соответствующий код:

public class Sender {

private static final String EVENTHUB_NS1= "TT";
private static final String EVENTHUB1= "TT";
private static final String SAS_KEY_NAME1= "RootManageSharedAccessKey";
private static final String SAS_KEY_VAL1= "SECRET1";

private static final String EVENTHUB_NS2= "TT1";
private static final String EVENTHUB2= "TT1";
private static final String SAS_KEY_NAME2= "RootManageSharedAccessKey";
private static final String SAS_KEY_VAL2= "SECRET2";


private EventData getEventData(int eventDataPrefix) throws IOException, URISyntaxException {
    String msgData = "<=>"+eventDataPrefix + "<=>"+"TEST MESSAGE..";
    final Gson gson = new GsonBuilder().create();
    byte [] data =gson.toJson(msgData).getBytes(Charset.defaultCharset());
    EventData ed = EventData.create(data);
    return ed;
}
public static final int MAX_BATCH_SIZE=100;
private List<EventData> getBatchOfEvents() throws IOException, URISyntaxException {
    List<EventData> events = new ArrayList<>();
    for(int i = 0; i < MAX_BATCH_SIZE; i ++){
        events.add(getEventData(i));
    }
    return events;
}
private List<String> getConnectionStrings(){
    List<String> connStrings = new ArrayList<>();
    ConnectionStringBuilder csBldr1 = new ConnectionStringBuilder();
    csBldr1.setNamespaceName(EVENTHUB_NS1);
    csBldr1.setEventHubName(EVENTHUB1);
    csBldr1.setSasKeyName(SAS_KEY_NAME1);
    csBldr1.setSasKey(SAS_KEY_VAL1);

    ConnectionStringBuilder csbldr2 = new ConnectionStringBuilder();
    csbldr2.setNamespaceName(EVENTHUB_NS2);
    csbldr2.setEventHubName(EVENTHUB2);
    csbldr2.setSasKeyName(SAS_KEY_NAME2);
    csbldr2.setSasKey(SAS_KEY_VAL2);

    connStrings.add(csBldr1.toString());
    connStrings.add(csbldr2.toString());
    return connStrings;
}

private List<EventHubClient> getEHClients() throws IOException, EventHubException, ExecutionException, InterruptedException {

    List<EventHubClient> ehClients = new ArrayList<>();
    System.out.println("Starting getEhCLients..");

    for( String connStr: getConnectionStrings()){

        final ExecutorService executorService = Executors.newSingleThreadExecutor();
        //The second iteration of for loop gives the EventHubException
        EventHubClient client= EventHubClient.createSync(connStr, executorService);


        ehClients.add(client);
        System.out.println("EH CONNSTR::"+connStr);
    }

    return ehClients;
}


private void sendBatch( List<EventData> events) throws IOException, EventHubException, ExecutionException, InterruptedException {
    List<EventHubClient> ehClients = getEHClients();
    if( ehClients.size() <=0) {
        System.out.println("NO EH CLients.. to send..");
        return;
    }
    for(int i = 0; i < events.size();i++){
        EventData data = events.get(i);
        int ehClientIndex = i % ehClients.size();
        EventHubClient client = ehClients.get(ehClientIndex);
        client.sendSync(data);
        System.out.print("MsgSent:"+ehClientIndex);
    }
    System.out.println("\nDone");
}
public static void main(String[] args) throws IOException, URISyntaxException, EventHubException, ExecutionException, InterruptedException {
    Sender sender = new Sender();
    List<EventData> events = sender.getBatchOfEvents();
    sender.sendBatch(events);

}

}

1 Ответ

0 голосов
/ 11 октября 2018

Очень интересный вариант использования, я мог бы попытаться воспроизвести со своей стороны. Я предполагаю, что EventHubClient имеет некоторое статическое поле, которое будет совместно использоваться несколькими экземплярами, это может вызвать вашу проблему.

Кроме того, я хочу понять, зачем использовать два пространства имен, как примерно одно пространство имен с более высокой пропускной способностью. Одно пространство имен похоже на кластер. На самом деле я из Spring Cloud Azure и пытаюсь улучшить работу с Java на Azure. Не стесняйтесь попробовать наш связующий центр событий. https://github.com/Microsoft/spring-cloud-azure

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...