Поставщик привязки не работает с весенним облачным потоком кролика - PullRequest
0 голосов
/ 08 января 2020

У нас есть источник, подобный следующему, и мы используем связку кролика весеннего облачного потока 3.0.1.RELEASE.

@Component
public class Handlers {

  private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();

  public void emitData(String str){
    sourceGenerator.onNext(str);
  }

  @Bean
  public Supplier<Flux<String>> generate() {
    return () -> sourceGenerator;
  }

  @Bean
  public Function<String, String> process() {
    return str -> str.toUpperCase();
  }


}

application.yml

spring:
  profiles: dev
  cloud:
    stream:
      function:
        definition: generate;process
        bindings:
          generate-out-0: source1
          process-in-0: source1
          process-out-0: processed

        bindingServiceProperties:
          defaultBinder: local_rabbit

      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: / 

Во время вызова emitData Метод, мы не видим данных в очереди RabbitMQ. Мы также заметили, что потребительская привязка работает. То, что мы проверяли с помощью прямой отправки сообщений в очередь, связанную с потребителем, через RabbitMQ Admin. Но привязка поставщика не работает.

Кроме того, мы заметили, что Supplier без Flux работает нормально с той же конфигурацией application.yml. Нам здесь не хватает какой-либо конфигурации?

Даже тестовый пример с TestChannelBinderConfiguration работает следующим образом.

@Slf4j
@TestPropertySource(
        properties = {"spring.cloud.function.definition = generate|process"}
)
public class HandlersTest extends AbstractTest {
  @Autowired
  private OutputDestination outputDestination;

  @Test
  public void testGeneratorAndProcessor() {
      final String testStr = "test"; 
      handlers.emitData(testStr);

      Object eventObj;
      final Message<byte[]> message = outputDestination.receive(1000);

      assertNotNull(message, "processing timeout");
      eventObj = message.getPayload();

      assertEquals(new String((byte[]) eventObj), testStr.toUpperCase());
  }
}

Ответы [ 3 ]

0 голосов
/ 09 января 2020

Хотя я ценю, что вы публикуете проект, к сожалению, ваша история продолжает меняться, и я все еще не уверен, чего вы хотите достичь sh. Итак, это мой последний ответ, но я постараюсь быть как можно более подробным и информативным, поэтому вот что я вижу из вашего проекта.

  1. Ваша конфигурация неверна. Свойство definition для функций должно spring.cloud.function.definition

. , .

spring:
  cloud:
    function:
       definition: generate;process;sink

. , .

Поскольку вы используете ; Я предполагаю, что вы хотите, чтобы все 3 функции были связаны независимо (без композиции функций), как описано в разделе множественное связывание .

spring.cloud.stream.function.bindings - это свойство, которое позволяет сопоставить сгенерированное имя привязки с именем настраиваемой привязки, как описано в Имена привязок функций . Это не имеет ничего общего с названиями фактических мест назначения. Для этого у нас есть свойство destination, которое также рассматривается в указанном разделе (например, --spring.cloud.stream.bindings.generate-out-0.destination = source1). Однако, если свойство destination не используется, имя привязки и имя назначения предполагаются одинаковыми. Однако для получателя также требуется имя группы, и если оно не указано, оно генерирует его. Итак, в зависимости от вашей конфигурации ваш generate-out-0 поставщик обязан source1 exchange :

enter image description here

Ввод функции process-in-in с другой стороны связан с source1.anonymous... очередью :

enter image description here

И как Я уже говорил ранее, что не существует привязки RabbitMQ между source1 exchange и source1.anonymous... queue , поэтому сообщения, отправляемые на source1 exchange , просто отбрасываются , При создании такой привязки (например, через консоль Rabbit MQ) сообщения доходят до потребителя.

При этом такой дизайн очень неэффективен. Почему вы хотите отправить и получить от того же получателя, находясь в том же пространстве процесса (JVM)? Зачем ругать сеть, когда можно просто перейти по ссылке? Поэтому, по крайней мере, изменив definition на spring.cloud.function.definition = generate | process | sink`. Лучшим решением было бы просто написать свой код в самом поставщике

public void emitData(String str) {
    String uppercased = str.toUpperCase();
    sourceGenerator.onNext(uppercased);
    System.out.println("Emitted: " + str);
}

и покончить с этим. В любом случае, я настоятельно рекомендую вам go ознакомиться с нашим руководством пользователя, в частности с разделом Основные понятия и с разделом Модель программирования , так как я считаю, что вы неправильно поняли некоторые основные понятия, которые, как я считаю, внести свой вклад в несоответствия как в вашем посте, так и в ваших вопросах.

0 голосов
/ 09 января 2020

Мы сделали некоторые изменения в коде. Но проблема все еще здесь. Поток реализации поставщика не работает. Поставщик без флюса работает нормально:


    @Bean
    public Supplier<Flux<String>> generate_flux() {
        return () -> sourceGenerator;
    }

    @Bean
    public Supplier<Message<?>> generate_non_flux() {
        return MessageBuilder
           .withPayload("Non flux emitter: " + LocalDateTime.now().toString())::build;
    }

Полный источник в том же месте

Также мы изменили application.yml, как вы предложили и мы провели несколько экспериментов. Спасибо за объяснение о значении темы. Но мы также проверили и можем сказать, что RabbitMQ автоматически связывает выходы и потребителей с тем же назначением и любыми указанными именами групп. Он работает как для явно указанных групп, так и для случайно сгенерированных. Речь идет не о параллельной обработке, а о способности RabbitMQ связать ее.

Оба generate_flux и generate_non_flux подключены к одному и тому же выходному месту назначения:

      bindings:
        generate_flux-out-0:
          destination: source
        generate_non_flux-out-0:
          destination: source

Теперь вывод приложения:

Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:51.721094
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:52.725961
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:53.727054
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Flux emitted: 2020-01-09T13:38:54.727898
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801
Consumed: NON FLUX EMITTER: 2020-01-09T13:38:49.761801

Есть обработанные сообщения с NON FLUX, но нет потоковых.

Итак, не потоковый излучатель работает нормально, но мы не можем использовать его для излучать по запросу. Внедрение Flux для поставщика не работает. С этого мы начали и не вносили никаких изменений в описание задачи.

Говоря о разделении кода на поставщика, процессор и приемник, мы говорим о разных типах машин. supplier - это устаревший код, который генерирует данные. processor - это часть рабочего процесса, потребляющая память, и мы хотим сохранить ее на отдельном наборе виртуальных машин с возможностью масштабирования в Kubernetes. sink в нашем случае это конкретная c машина, которая хранит данные в БД. В то же время из-за унаследованного кода мы хотим иметь общий код приложения в целом и не разбивать его на отдельные приложения, такие как Apache Beam-ориентированные.

0 голосов
/ 08 января 2020

Когда вы говорите we are not seeing data in RabbitMQ queue. . .. О какой очереди вы говорите? При использовании AMQP сообщения отправляются на exchanges, и если такой обмен не привязан ни к какому queue, сообщение отбрасывается, поэтому мой вопрос. Вы действительно связали generate-out-0 обмен с очередью?

В любом случае, я только что проверил это, и все работает как ожидалось. Вот полный код.

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        ApplicationContext context = SpringApplication.run(SimpleStreamApplication.class);
        SimpleStreamApplication app = context.getBean(SimpleStreamApplication.class);
        app.emitData("Hello");
    }

    private EmitterProcessor<String> sourceGenerator = EmitterProcessor.create();

    public void emitData(String str) {
        sourceGenerator.onNext(str);
    }

    @Bean
    public Supplier<Flux<String>> generate() {
        return () -> sourceGenerator;
    }
}
...