Получать события от AMQP с Axon 4 - PullRequest
2 голосов
/ 20 апреля 2019

Я пытаюсь отправлять сообщения через rabbitmq в систему с весенней загрузкой axon4.Сообщение получено, но никакие события не инициированы.Я очень уверен, что мне не хватает важной части, но до сих пор я не смог понять это.

Здесь соответствующая часть моего приложения .yml

axon:
    amqp:
        exchange: axon.fanout
        transaction-mode: publisher_ack
    # adding the following lines changed nothing
    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing
spring:
    rabbitmq:
        username: rabbit
        password: rabbit

ОтДокументы, которые я обнаружил, что я должен создать bean-компонент SpringAMQPMessageSource:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class AxonConfig {

    @Bean
    SpringAMQPMessageSource inputMessageSource(final AMQPMessageConverter messageConverter) {
        return new SpringAMQPMessageSource(messageConverter) {
            @RabbitListener(queues = "in.queue")
            @Override
            public void onMessage(final Message message, final Channel channel) {
                log.debug("received external message: {}, channel: {}", message, channel);
                super.onMessage(message, channel);
            }
        };
    }

}

Если я отправляю сообщение в очередь из административной панели rabbitmq, я вижу журнал:

AxonConfig : received external message: (Body:'[B@13f7aeef(byte[167])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=in.queue, deliveryTag=2, consumerTag=amq.ctag-xi34jwHHA__xjENSteX5Dw, consumerQueue=in.queue]), channel: Cached Rabbit Channel: AMQChannel(amqp://rabbit@127.0.0.1:5672/,1), conn: Proxy@11703cc8 Shared Rabbit Connection: SimpleConnection@581cb879 [delegate=amqp://rabbit@127.0.0.1:5672/, localPort= 58614]

Здесь агрегат, который должен получать события:

import lombok.extern.slf4j.Slf4j;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.spring.stereotype.Aggregate;
import pm.mbo.easyway.api.app.order.commands.ConfirmOrderCommand;
import pm.mbo.easyway.api.app.order.commands.PlaceOrderCommand;
import pm.mbo.easyway.api.app.order.commands.ShipOrderCommand;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import static org.axonframework.modelling.command.AggregateLifecycle.apply;

@ProcessingGroup("amqpEvents")
@Slf4j
@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;
    private boolean orderConfirmed;

    @CommandHandler
    public OrderAggregate(final PlaceOrderCommand command) {
        log.debug("command: {}", command);
        apply(new OrderPlacedEvent(command.getOrderId(), command.getProduct()));
    }

    @CommandHandler
    public void handle(final ConfirmOrderCommand command) {
        log.debug("command: {}", command);
        apply(new OrderConfirmedEvent(orderId));
    }

    @CommandHandler
    public void handle(final ShipOrderCommand command) {
        log.debug("command: {}", command);
        if (!orderConfirmed) {
            throw new IllegalStateException("Cannot ship an order which has not been confirmed yet.");
        }
        apply(new OrderShippedEvent(orderId));
    }

    @EventSourcingHandler
    public void on(final OrderPlacedEvent event) {
        log.debug("event: {}", event);
        this.orderId = event.getOrderId();
        orderConfirmed = false;
    }

    @EventSourcingHandler
    public void on(final OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderConfirmed = true;
    }

    @EventSourcingHandler
    public void on(final OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderConfirmed = true;
    }

    protected OrderAggregate() {
    }

}

Таким образом, проблема заключается в том, что сообщения принимаются системой, но никакие события не инициируются.Содержание сообщений, кажется, не имеет значения.Что бы я ни отправлял в очередь, я получаю только сообщение журнала от моего метода onMessage.

JavaDoc из SpringAMQPMessageSource говорит следующее:

/**
 * MessageListener implementation that deserializes incoming messages and forwards them to one or more event processors.
 * <p>
 * The SpringAMQPMessageSource must be registered with a Spring MessageListenerContainer and forwards each message
 * to all subscribed processors.
 * <p>
 * Note that the Processors must be subscribed before the MessageListenerContainer is started. Otherwise, messages will
 * be consumed from the AMQP Queue without any processor processing them.
 *
 * @author Allard Buijze
 * @since 3.0
 */

Но до сих пор я не мог выяснить, где иликак его зарегистрировать.

Записи axon.eventhandling в моей конфигурации и @ProcessingGroup ("amqpEvents") в моем Агрегате уже из тестирования.Но наличие или отсутствие этих записей не имеет никакого значения.Также пробовал без режима = подписка.

Точные версии: Spring Boot 2.1.4, Axon 4.1.1, axon-amqp-spring-boot-autoconfigure 4.1

Любая помощь или советы высоко ценится.


Обновление 23.04.19:

Я пытался написать свой собственный класс следующим образом:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

@Slf4j
@Component
public class RabbitMQSpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {

    private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArrayList<>();
    private final AMQPMessageConverter messageConverter;

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }

    @Override
    public Registration subscribe(final Consumer<List<? extends EventMessage<?>>> messageProcessor) {
        eventProcessors.add(messageProcessor);
        log.debug("subscribe to: {}", messageProcessor);
        return () -> eventProcessors.remove(messageProcessor);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received external message: {}, channel: {}", message, channel);
        log.debug("eventProcessors: {}", eventProcessors);
        if (!eventProcessors.isEmpty()) {
            messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders())
                            .ifPresent(event -> eventProcessors.forEach(
                                ep -> ep.accept(Collections.singletonList(event))
                            ));
        }
    }

}

Результат тот же, и журнал теперь подтверждаетчто обработчики событий просто пусты.

eventProcessors: []

Итак, вопрос в том, как правильно зарегистрировать обработчики событий.Есть ли способ, как сделать это правильно с пружиной?

Обновление 2:

Также не повезло с этим:

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {

        try {
            final var eventProcessorsField = this.getClass().getSuperclass().getDeclaredField("eventProcessors");
            eventProcessorsField.setAccessible(true);
            final var eventProcessors = (List<Consumer<List<? extends EventMessage<?>>>>) eventProcessorsField.get(this);
            log.debug("eventProcessors: {}", eventProcessors);
        } catch (NoSuchFieldException | IllegalAccessException e) {
            e.printStackTrace();
        }

        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING

Регистрация программно в дополнение квыше также не помогло:

    @Autowired
    void configure(EventProcessingModule epm,
                   RabbitMQSpringAMQPMessageSource rabbitMessageSource) {
        epm.registerSubscribingEventProcessor("rabbitMQSpringAMQPMessageSource", c -> rabbitMessageSource);
        epm.assignProcessingGroup("amqpEvents", "rabbitMQSpringAMQPMessageSource");// this line also made no difference
    }

Конечно, @ProcessingGroup ("amqpEvents") в моем классе, который содержит аннотированные методы @EventSourcingHandler.


Обновление 25.4.19:

см. Принятый ответ от Алларда.Большое спасибо за указание на мою ошибку: я упустил тот факт, что EventSourcingHandler не получает сообщения извне.Это для прогнозов.Не для распространения агрегатов! ups Вот конфиг / классы, которые теперь получают события от rabbitmq:

axon:
    eventhandling:
        processors:
            amqpEvents:
                source: rabbitMQSpringAMQPMessageSource
                mode: SUBSCRIBING
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.spring.SpringAMQPMessageSource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component("rabbitMQSpringAMQPMessageSource")
public class RabbitMQSpringAMQPMessageSource extends SpringAMQPMessageSource {

    @Autowired
    public RabbitMQSpringAMQPMessageSource(final AMQPMessageConverter messageConverter) {
        super(messageConverter);
    }

    @RabbitListener(queues = "${application.queues.in}")
    @Override
    public void onMessage(final Message message, final Channel channel) {
        log.debug("received message: message={}, channel={}", message, channel);
        super.onMessage(message, channel);
    }

}
import lombok.extern.slf4j.Slf4j;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.queryhandling.QueryHandler;
import org.springframework.stereotype.Service;
import pm.mbo.easyway.api.app.order.events.OrderConfirmedEvent;
import pm.mbo.easyway.api.app.order.events.OrderPlacedEvent;
import pm.mbo.easyway.api.app.order.events.OrderShippedEvent;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@ProcessingGroup("amqpEvents")
@Service
public class OrderedProductsEventHandler {

    private final Map<String, OrderedProduct> orderedProducts = new HashMap<>();

    @EventHandler
    public void on(OrderPlacedEvent event) {
        log.debug("event: {}", event);
        String orderId = event.getOrderId();
        orderedProducts.put(orderId, new OrderedProduct(orderId, event.getProduct()));
    }

    @EventHandler
    public void on(OrderConfirmedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderConfirmed();
            return orderedProduct;
        });
    }

    @EventHandler
    public void on(OrderShippedEvent event) {
        log.debug("event: {}", event);
        orderedProducts.computeIfPresent(event.getOrderId(), (orderId, orderedProduct) -> {
            orderedProduct.setOrderShipped();
            return orderedProduct;
        });
    }

    @QueryHandler
    public List<OrderedProduct> handle(FindAllOrderedProductsQuery query) {
        log.debug("query: {}", query);
        return new ArrayList<>(orderedProducts.values());
    }

}

Я удалил @ProcessingGroup из моего Агрегата, конечно.

Мои журналы:

RabbitMQSpringAMQPMessageSource : received message: ... 
OrderedProductsEventHandler : event: OrderShippedEvent...

1 Ответ

2 голосов
/ 24 апреля 2019

В Аксоне Агрегаты не получают события извне.Обработчики событий внутри агрегатов (точнее, EventSourcingHandlers) обрабатывают только события, опубликованные этим же агрегатным экземпляром, чтобы он мог восстановить свое предыдущее состояние.

Это, например, только внешние обработчики событийте, которые обновляют прогнозы, будут получать события из внешних источников.

Чтобы это работало, ваш application.yml должен упоминать имя компонента в качестве источника процессора, а не имя очереди.Итак, в вашем первом примере:

    eventhandling:
        processors:
            amqpEvents:
                source: in.queue
                mode: subscribing

Должен стать:

    eventhandling:
        processors:
            amqpEvents:
                source: inputMessageSource
                mode: subscribing

Но, опять же, это работает только для обработчиков событий, определенных для компонентов, а не для Агрегатов.

...