Azure eventhub EventHubConsumerAsyncClient не удалось запустить в не веб-приложении Springboot - PullRequest
0 голосов
/ 29 апреля 2020

Я создаю Azure потребителя концентратора событий с использованием Springboot, и он работает с веб-конфигурацией.

Я пытаюсь достичь того же результата с помощью не-веб-конфигурации, как показано в следующих фрагментах, и получаю результат, как показано на консоли.

Зависимости:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>com.azure</groupId>
        <artifactId>azure-messaging-eventhubs</artifactId>
        <version>5.0.3</version>
    </dependency>
</dependencies>

AzureEventhubConsumerApplication. java

@EnableAsync
@SpringBootApplication
public class AzureEventhubConsumerApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(AzureEventhubConsumerApplication.class).web(WebApplicationType.NONE).run(args);
    }
}

EventProcessorHostService. java

public class EventProcessorHostService {

    @Autowired
    EventhubProperties ehProps;

    @Autowired
    TaskExecutor taskexecutor;

    @PostConstruct
    public void run() throws ExecutionException, InterruptedException {
        EventHubConsumerAsyncClient client = new EventHubClientBuilder()
                .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
                .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();

        client.receive(true).subscribe(event -> {
            PartitionContext context = event.getPartitionContext();
            EventData eData = event.getData();
            System.out.printf("Event %s is from partition %s%n.", eData.getSequenceNumber(), context.getPartitionId());

        });

    }
}

Консоль

 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.2.6.RELEASE)

2020-04-29 18:38:52.905  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : Starting AzureEventhubConsumerApplication on Nikhils-MacBook-Pro.local with PID 2248 (/AzureEventhubConsumer/target/classes started by nikhil in /Projects/Code/AzureEventhubConsumer)
2020-04-29 18:38:52.908  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : No active profile set, falling back to default profiles: default
2020-04-29 18:38:52.957  INFO 2248 --- [  restartedMain] .e.DevToolsPropertyDefaultsPostProcessor : Devtools property defaults active! Set 'spring.devtools.add-properties' to 'false' to disable
2020-04-29 18:38:53.467  INFO 2248 --- [  restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-04-29 18:38:53.541  INFO 2248 --- [  restartedMain] c.a.m.eventhubs.EventHubClientBuilder    : connectionId[MF_84ae03_1588165733540]: Emitting a single connection.
2020-04-29 18:38:53.623  INFO 2248 --- [  restartedMain] c.a.m.e.i.EventHubConnectionProcessor    : connectionId[sbeventhumdemo.servicebus.windows.net] entityPath[spring-event-hub]: Setting next AMQP channel.
2020-04-29 18:38:53.642  INFO 2248 --- [  restartedMain] c.a.c.a.i.ReactorConnection              : connectionId[MF_84ae03_1588165733540]: Creating and starting connection to sbeventhumdemo.servicebus.windows.net:5671
2020-04-29 18:38:53.662  INFO 2248 --- [  restartedMain] c.a.c.a.implementation.ReactorExecutor   : connectionId[MF_84ae03_1588165733540], message[Starting reactor.]
2020-04-29 18:38:53.684  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionInit hostname[sbeventhumdemo.servicebus.windows.net], connectionId[MF_84ae03_1588165733540]
2020-04-29 18:38:53.685  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ReactorHandler         : connectionId[MF_84ae03_1588165733540] reactor.onReactorInit
2020-04-29 18:38:53.687  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionLocalOpen hostname[sbeventhumdemo.servicebus.windows.net:5671], connectionId[MF_84ae03_1588165733540], errorCondition[null], errorDescription[null]
2020-04-29 18:38:53.694  INFO 2248 --- [  restartedMain] c.a.c.a.i.ReactorConnection              : Emitting new response channel. connectionId: MF_84ae03_1588165733540. entityPath: $management. linkName: mgmt.
2020-04-29 18:38:53.694  INFO 2248 --- [  restartedMain] a.i.RequestResponseChannel<mgmt-session> : connectionId[MF_84ae03_1588165733540] entityPath[$management]: Setting next AMQP channel.
2020-04-29 18:38:53.696  INFO 2248 --- [  restartedMain] c.a.m.e.i.ManagementChannel              : Management endpoint state: UNINITIALIZED
2020-04-29 18:38:53.772  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ConnectionHandler      : onConnectionBound hostname[sbeventhumdemo.servicebus.windows.net], connectionId[MF_84ae03_1588165733540]
2020-04-29 18:38:53.871  INFO 2248 --- [       single-1] c.a.c.a.i.handler.ReceiveLinkHandler     : onLinkLocalOpen connectionId[MF_84ae03_1588165733540], linkName[mgmt:receiver], localSource[Source{address='$management', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2020-04-29 18:38:53.934  INFO 2248 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2020-04-29 18:38:53.961  INFO 2248 --- [  restartedMain] c.e.c.A.AzureEventhubConsumerApplication : Started AzureEventhubConsumerApplication in 1.338 seconds (JVM running for 1.897)
2020-04-29 18:38:53.971  INFO 2248 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'

Ответы [ 2 ]

3 голосов
/ 29 апреля 2020

.subscribe() - это неблокирующий вызов. Судя по вашим журналам, приложение закрывается, прежде чем оно сможет инициализировать соединение с концентраторами событий и начать выборку данных. Если вы разрешите своему приложению работать дольше или добавите Thread.sleep после операции subscribe, вы должны увидеть некоторые данные.

В качестве примечания: EventProcessorClient лучше подходит для производства при использовании всех события из Event Hub. Может загружать баланс и отслеживать обработанные события.

private Disposable subscription;
private EventHubConsumerAsyncClient client;

@PostConstruct
public void run() throws ExecutionException, InterruptedException {
    client = new EventHubClientBuilder()
            .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
            .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();

    subscription = client.receive(true).subscribe(event -> {
        PartitionContext context = event.getPartitionContext();
        EventData eData = event.getData();
        System.out.printf("Event %s is from partition %s%n.", eData.getSequenceNumber(), context.getPartitionId());
    });
}

@PreDestroy
public void destroy() {
    if (subscription != null) {
        subscription.dispose();
    }
    if (client != null) {
        client.close();
    }
}
0 голосов
/ 29 апреля 2020

Выполнение Azure операции через экземпляр TaskExecutor.

Изменение в EventProcessorHostService. java

@Component
public class EventProcessorHostService {

    @Autowired
    TaskExecutor taskexecutor;

    @Autowired
    EventhubProperties ehProps;

    private Disposable subscription;
    private EventHubConsumerAsyncClient client;

    @PostConstruct
    public void run() throws ExecutionException, InterruptedException {

        taskexecutor.execute(() -> {
            client = new EventHubClientBuilder()
                    .connectionString(ehProps.getConnectionString(), ehProps.getEventHubName())
                    .consumerGroup(ehProps.getStorage().getConsumerGroupName()).buildAsyncConsumerClient();
            subscription = client.receive(true).subscribe(new EventProcessor());
        });

    }

    @PreDestroy
    public void destroy() {
        if (subscription != null) {
            subscription.dispose();
        }

        if (client != null) {
            client.close();
        }

    }
}

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...