Это также не является дубликатом ошибки «Соединение прервано» при попытке отправить события в Azure Event Hub с помощью API-интерфейса java EventHubClient , поскольку проблема в этом вопросе была связана с прокси-сервером. Это НЕ проблема с моими настройками прокси, так как этот код работает для клиента-концентратора Singe Event
Мой вопрос: возможно ли иметь Java-приложение, которое отправляет сообщения ДВЕ или более Клиентам-концентраторам событий?
Я пытаюсь опубликовать / отправить данные о событиях в несколько экземпляров концентраторов событий Azure.
Я подготовил ДВА пространства имен концентратора событий, и у каждого пространства имен есть свой концентратор событий.
Я имею во всех ДВУХ центрах событий, которые имеют свои собственные Строки Соединения, Ключи SAS и Пространства имен и Имена.
Поскольку каждое пространство имен концентратора событий может поддерживать только 20 единиц пропускной способности без ручного вмешательства (запрос на обслуживание), я пытаюсь выяснить, могу ли я отправить свои данные в концентраторы с несколькими событиями.
Я вижу, что мой код отлично работает для SINGLE EventHubClient . В тот момент, когда мой код пытается создать второй EventHubClient, я получаю исключение для этого соединения.
Я использую пример кода, предоставленный на git-концентраторе Azure по адресу https://github.com/MicrosoftDocs/azure-docs/blob/master/articles/event-hubs/event-hubs-java-get-started-send.md
Я вижу следующее исключение:
Exception in thread "main" com.microsoft.azure.eventhubs.EventHubException: connection aborted
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:59)
at com.microsoft.azure.eventhubs.impl.MessagingFactory.onConnectionError(MessagingFactory.java:249)
at com.microsoft.azure.eventhubs.impl.ConnectionHandler.onTransportError(ConnectionHandler.java:102)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:445)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Любые указатели / входы будут высоко оценены.
Вот соответствующий код:
public class Sender {
private static final String EVENTHUB_NS1= "TT";
private static final String EVENTHUB1= "TT";
private static final String SAS_KEY_NAME1= "RootManageSharedAccessKey";
private static final String SAS_KEY_VAL1= "SECRET1";
private static final String EVENTHUB_NS2= "TT1";
private static final String EVENTHUB2= "TT1";
private static final String SAS_KEY_NAME2= "RootManageSharedAccessKey";
private static final String SAS_KEY_VAL2= "SECRET2";
private EventData getEventData(int eventDataPrefix) throws IOException, URISyntaxException {
String msgData = "<=>"+eventDataPrefix + "<=>"+"TEST MESSAGE..";
final Gson gson = new GsonBuilder().create();
byte [] data =gson.toJson(msgData).getBytes(Charset.defaultCharset());
EventData ed = EventData.create(data);
return ed;
}
public static final int MAX_BATCH_SIZE=100;
private List<EventData> getBatchOfEvents() throws IOException, URISyntaxException {
List<EventData> events = new ArrayList<>();
for(int i = 0; i < MAX_BATCH_SIZE; i ++){
events.add(getEventData(i));
}
return events;
}
private List<String> getConnectionStrings(){
List<String> connStrings = new ArrayList<>();
ConnectionStringBuilder csBldr1 = new ConnectionStringBuilder();
csBldr1.setNamespaceName(EVENTHUB_NS1);
csBldr1.setEventHubName(EVENTHUB1);
csBldr1.setSasKeyName(SAS_KEY_NAME1);
csBldr1.setSasKey(SAS_KEY_VAL1);
ConnectionStringBuilder csbldr2 = new ConnectionStringBuilder();
csbldr2.setNamespaceName(EVENTHUB_NS2);
csbldr2.setEventHubName(EVENTHUB2);
csbldr2.setSasKeyName(SAS_KEY_NAME2);
csbldr2.setSasKey(SAS_KEY_VAL2);
connStrings.add(csBldr1.toString());
connStrings.add(csbldr2.toString());
return connStrings;
}
private List<EventHubClient> getEHClients() throws IOException, EventHubException, ExecutionException, InterruptedException {
List<EventHubClient> ehClients = new ArrayList<>();
System.out.println("Starting getEhCLients..");
for( String connStr: getConnectionStrings()){
final ExecutorService executorService = Executors.newSingleThreadExecutor();
//The second iteration of for loop gives the EventHubException
EventHubClient client= EventHubClient.createSync(connStr, executorService);
ehClients.add(client);
System.out.println("EH CONNSTR::"+connStr);
}
return ehClients;
}
private void sendBatch( List<EventData> events) throws IOException, EventHubException, ExecutionException, InterruptedException {
List<EventHubClient> ehClients = getEHClients();
if( ehClients.size() <=0) {
System.out.println("NO EH CLients.. to send..");
return;
}
for(int i = 0; i < events.size();i++){
EventData data = events.get(i);
int ehClientIndex = i % ehClients.size();
EventHubClient client = ehClients.get(ehClientIndex);
client.sendSync(data);
System.out.print("MsgSent:"+ehClientIndex);
}
System.out.println("\nDone");
}
public static void main(String[] args) throws IOException, URISyntaxException, EventHubException, ExecutionException, InterruptedException {
Sender sender = new Sender();
List<EventData> events = sender.getBatchOfEvents();
sender.sendBatch(events);
}
}