Абстрагирование Spring Cloud Stream Producer и Consumer code - PullRequest
0 голосов
/ 20 апреля 2020

У меня есть служба, которая производит и потребляет сообщения из разных каналов Spring Cloud Stream (связана с темами EventHub / Kafka). Есть несколько таких служб, которые настроены аналогично.

Конфигурация выглядит следующим образом

 public interface MessageStreams {
      String WORKSPACE = "workspace";
      String UPLOADNOTIFICATION = "uploadnotification";
      String BLOBNOTIFICATION = "blobnotification";
      String INGESTIONSTATUS = "ingestionstatusproducer";

      @Input(WORKSPACE)
      SubscribableChannel workspaceChannel();

      @Output(UPLOADNOTIFICATION)
      MessageChannel uploadNotificationChannel();

      @Input(BLOBNOTIFICATION)
      SubscribableChannel blobNotificationChannel();

      @Output(INGESTIONSTATUS)
      MessageChannel ingestionStatusChannel();
    }


    @EnableBinding(MessageStreams.class)
    public class EventHubStreamsConfiguration {
    }

Код производителя / издателя выглядит следующим образом

    @Service
    @Slf4j
    public class IngestionStatusEventPublisher {
      private final MessageStreams messageStreams;

      public IngestionStatusEventPublisher(MessageStreams messageStreams) {
        this.messageStreams = messageStreams;
      }

      public void sendIngestionStatusEvent() {
        log.info("Sending ingestion status event");
        System.out.println("Sending ingestion status event");
        MessageChannel messageChannel = messageStreams.ingestionStatusChannel();
        boolean messageSent = messageChannel.send(MessageBuilder
            .withPayload(IngestionStatusMessage.builder()
                .correlationId("some-correlation-id")
                .status("done")
                .source("some-source")
                .eventTime(OffsetDateTime.now())
                .build())
            .setHeader("tenant-id", "some-tenant")
            .build());
        log.info("Ingestion status event sent successfully {}", messageSent);
      }
    }

Аналогично у меня есть несколько других издателей, которые публикуют sh в различных концентраторах событий. Обратите внимание, что для каждого опубликованного сообщения устанавливается заголовок идентификатора клиента. Это что-то определенное c для моего мультитенантного приложения, чтобы отслеживать контекст арендатора. Также обратите внимание, что я получаю канал для публикации при отправке сообщения.

Мой код получателя выглядит следующим образом:

    @Component
    @Slf4j
    public class IngestionStatusEventHandler {
      private AtomicInteger eventCount = new AtomicInteger();

      @StreamListener(TestMessageStreams.INGESTIONSTATUS)
      public void handleEvent(@Payload IngestionStatusMessage message, @Header(name = "tenant-id") String tenantId) throws Exception {
        log.info("New ingestion status event received: {} in Consumer: {}", message, Thread.currentThread().getName());

        // set the tenant context as thread local from the header.

      }

Опять же, у меня есть несколько таких потребителей, а также есть контекст арендатора, который устанавливается для каждого потребителя на основе входящего заголовка идентификатора клиента, который отправлено издателем.

Мои вопросы

Как избавиться от кода основной платы, задав заголовок идентификатора арендатора в Publisher и настроив контекст арендатора в приемнике, абстрагировав его в библиотеку, которая может быть включена во все различные службы, которые у меня есть.

Кроме того, существует ли способ динамической идентификации канала на основе типа публикуемого сообщения. для ex IngestionStatusMessage.class в данном сценарии

1 Ответ

1 голос
/ 20 апреля 2020

Чтобы установить и tenant-id заголовок в общем коде и избежать его копирования / вставки в каждом микросервисе, вы можете использовать ChannelInterceptor и сделать его глобальным с параметром @GlobalChannelInterceptor и его patterns.

См. Дополнительную информацию в Spring Integration: https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/core.html#channel -интерцепторы

https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/overview.html#configuration -Включаемая интеграция

Вы можете Не выбирайте канал по типу полезной нагрузки, потому что тип полезной нагрузки действительно определяется из сигнатуры метода @StreamListener.

Можно попытаться получить общий @Router с ожиданием Message<?> и затем вернуть конкретное имя канала для маршрутизации в соответствии с этим контекстом сообщения запроса.

См. https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/message-routing.html#messaging -routing-chapter

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...