Не удается прочитать события из Azure Event Hub через службу приложений - PullRequest
0 голосов
/ 07 апреля 2020
Have written a service which reads events from events hub, in local system its working but when deployed as an App service to cloud not able to reads the events.
Below is the stack trace while reading events from Azure eventhub.

> 2020-04-07 09:42:59.021  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.BaseLinkHandler     : closeSession for
> clientName[cbs], linkName[cbs:sender], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.022  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler     :
> onLinkLocalClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.022 
> INFO 54245 --- [pool-4-thread-4]
> c.m.azure.eventhubs.impl.SessionHandler  : onSessionLocalClose
> connectionId[cbs-session], entityName[MF_7a461b_1586238177759],
> condition[Error{condition=null, description='null', info=null}]
> 2020-04-07 09:42:59.061  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.BaseLinkHandler     : onLinkRemoteClose
> clientName[cbs], linkName[cbs:sender], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.061  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler     :
> processOnClose clientName[cbs], linkName[cbs:sender],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler 
> : onLinkRemoteClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.eventhubs.impl.BaseLinkHandler 
> : processOnClose clientName[cbs], linkName[cbs:receiver],
> errorCondition[null], errorDescription[null] 2020-04-07 09:42:59.061 
> INFO 54245 --- [pool-4-thread-4] c.m.a.e.impl.RequestResponseOpener   
> : requestResponseChannel.onClose complete
> clientId[MF_7a461b_1586238177759], session[cbs-session], link[cbs],
> endpoint[$cbs] 2020-04-07 09:42:59.062  INFO 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> messagingFactory[MF_7a461b_1586238177759],
> hostName[products-dev.servicebus.windows.net], info[cbsChannel closed]
> 2020-04-07 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionRemoteClose
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.062  WARN 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> onConnectionError messagingFactory[MF_7a461b_1586238177759],
> hostname[products-dev.servicebus.windows.net], error[null] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onTransportClosed
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], error[n/a] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.CustomIOHandler     : onTransportClosed
> name[MF_7a461b_1586238177759],
> hostname[products-dev.servicebus.windows.net:5671] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionUnbound
> hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], state[CLOSED],
> remoteState[CLOSED] 2020-04-07 09:42:59.062  INFO 54245 ---
> [pool-4-thread-4] c.m.azure.eventhubs.impl.SessionHandler  :
> onSessionFinal connectionId[MF_7a461b_1586238177759],
> entityName[cbs-session], condition[null], description[null] 2020-04-07
> 09:42:59.062  INFO 54245 --- [pool-4-thread-4]
> c.m.azure.eventhubs.impl.SessionHandler  : onSessionFinal
> connectionId[MF_7a461b_1586238177759], entityName[products],
> condition[null], description[null] 2020-04-07 09:42:59.062  INFO 54245
> --- [pool-4-thread-4] c.m.a.eventhubs.impl.ConnectionHandler   : onConnectionFinal hostname[products-dev.servicebus.windows.net:5671],
> connectionId[MF_7a461b_1586238177759], errorCondition[null],
> errorDescription[null] 2020-04-07 09:42:59.062  WARN 54245 ---
> [pool-4-thread-4] c.m.a.eventhubs.impl.MessagingFactory    :
> messagingFactory[MF_7a461b_1586238177759],
> hostName[products-dev.servicebus.windows.net], message[stopping the
> reactor because thread was interrupted or the reactor has no more
> events to process.]


Code:

Регистрация класса обработчика событий с экземпляром EventProcessorHost запускает обработку событий. Экземпляр хоста получает аренду некоторых разделов концентратора событий, возможно, крадя их у других экземпляров хоста, и это сходится при равномерном распределении разделов по всем экземплярам хоста. Для каждого выделенного раздела экземпляр узла создает экземпляр предоставленного класса обработчика событий, затем получает события из этого раздела и передает их экземпляру обработчика событий.

В EventProcessorHost есть две системы уведомлений об ошибках. Уведомление об ошибках, связанных с конкретным разделом, таких как сбой получателя, доставляется экземпляру обработчика событий для этого раздела с помощью метода onError. Уведомления об ошибках, не привязанных к конкретному разделу, например об ошибках инициализации, доставляются в общий обработчик уведомлений, который указывается с помощью объекта EventProcessorOptions. От вас не требуется предоставлять такой обработчик уведомлений, но если вы этого не сделаете, вы можете не знать, что произошли определенные ошибки.

@ RestController publi c class ReceiveEventsController {

личное состояние c окончательное Logger logger = LoggerFactory.getLogger (ReceiveEventsController.class);

@Value("${spring.cloud.azure.eventhub.namespace}")
private String namespaceName;

@Value("${spring.cloud.azure.eventhub.name}")
private String eventHubName;

@Value("${spring.cloud.azure.eventhub.sas.key.name}")
private String sasKeyName;

@Value("${spring.cloud.azure.eventhub.sas.key.value}")
private String sasKey;

@Value("${spring.cloud.stream.bindings.input.group}")
private String consumerGroupName;

@Value("${spring.cloud.azure.eventhub.storage.connection.string}")
private String storageConnectionString;

@Value("${spring.cloud.azure.eventhub.checkpoint-container}")
private String storageContainerName;

@Value("${spring.cloud.azure.eventhub.storage.hostname.prefix}")
private String hostNamePrefix;

@PostMapping("/receive/events")
public String postMessage() throws EventHubException, IOException, InterruptedException, ExecutionException, URISyntaxException {

    URI uri = new URI("sb://products-dev.servicebus.windows.net");
    ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder()
            .setEndpoint(uri)
            .setEventHubName(eventHubName)
            .setSasKeyName(sasKeyName)
            .setSasKey(sasKey);
    EventProcessorHost host = EventProcessorHost.EventProcessorHostBuilder
            .newBuilder(EventProcessorHost.createHostName(hostNamePrefix), consumerGroupName)
            .useAzureStorageCheckpointLeaseManager(storageConnectionString, storageContainerName, null)
            .useEventHubConnectionString(eventHubConnectionString.toString(), eventHubName)
            .build();

    logger.info("Registering host named " + host.getHostName()+ "Endpoint " + eventHubConnectionString.getEndpoint());
    EventProcessorOptions options = new EventProcessorOptions();
    options.setExceptionNotification(new ErrorNotificationHandler());

    host.registerEventProcessor(EventProcessor.class, options)
    .whenComplete((unused, e) ->
    {
        if (e != null)
        {
            logger.info("Failure while registering: " + e.toString());
            if (e.getCause() != null)
            {
                logger.info("Inner exception: " + e.getCause().toString());
            }
        }
    })
    .thenAccept((unused) ->
    {
        logger.info("Press enter to stop.");
        try 
        {
            System.in.read();
        }
        catch (Exception e)
        {
            logger.info("Keyboard read failed: " + e.toString());
        }
    })
    .thenCompose((unused) ->
    {
        return host.unregisterEventProcessor();
    })
    .exceptionally((e) ->
    {
        logger.info("Failure while unregistering: " + e.toString());
        if (e.getCause() != null)
        {
            logger.info("Inner exception: " + e.getCause().toString());
        }
        return null;
    })
    .get();

    logger.info("End of PRODUCT");
    return "Event Received";
}

}

...