Spring Aws Kinesis Сообщения не расходуются по порядку - PullRequest
0 голосов
/ 02 апреля 2019

Я отправляю 100 сообщений в поток с 1 шрадом.

spring:
  cloud:
    stream:
      bindings:
        myOutBound:
          destination: my-stream
          contentType: application/json

Я отправляю сообщения в цикле для целей тестирования

@EnableBinding(MyBinder.class)
public class MyProcessor {

  @Autowired
  private MyBinder myBinder;

  public void processRollup() {
    List<MyObject> myObjects =  IntStream.range(1, 100)
        .mapToObj(Integer::valueOf)
        .map(s-> new MyObject(s))
        .collect(toList());
    myObjects.forEach(messagePayload ->{
      System.out.println(messagePayload.getId());
      myBinder.myOutBound()
          .send(MessageBuilder.withPayload(messagePayload)
              .build());
        }
    );
  }

}

Я потребляю сообщения, как показано ниже

spring:
  cloud:
    stream:
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-stream
          content-type: application/json

Потребление сообщений не упорядочено.

Я что-то упустил?

1 Ответ

1 голос
/ 03 апреля 2019

Есть несколько вещей, которые следует учитывать. Прежде всего, производитель в Binder основан на режиме KinesisMessageHandler с async по умолчанию:

messageHandler.setSync(producerProperties.getExtension().isSync());

Таким образом, даже если вам кажется, что вы отправляете эти сообщения в правильном порядке одно за другим, это не означает, что они достигают потока в AWS в том же порядке.

Также нет никакой гарантии, что они в любом случае рассчитываются на AWS в том же порядке, даже если вы отправляете их в режиме синхронизации.

Смотрите здесь: Amazon Kinesis и гарантированный заказ

Также вы можете получить гарантию заказа в том же шарде с помощью явного sequenceNumber:

Чтобы гарантировать строго возрастающее упорядочение, пишите последовательно в шард и используйте параметр SequenceNumberForOrdering.

https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

К сожалению, в данный момент Kinesis Binder не поддерживает эту опцию, но мы можем преодолеть ее с помощью явного AwsHeaders.SEQUENCE_NUMBER, установленного в сообщении, перед отправкой его в output пункт назначения Binder:

String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
    if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
        sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...