Записи не расходуются при добавлении контрольной точки - PullRequest
0 голосов
/ 25 июня 2018

У меня следующая конфигурация для KinesisMessageDrivenChannelAdapter, когда я удаляю dynamoDbMetaDataStore в качестве контрольной точки, сообщения принимаются правильно, но когда я добавляю их обратно, записи всегда пусты.Я отладил код и KinesisMessageDrivenChannelAdapter.processTask() строка 776 (версия 2.0.0.M2) возвращает пустые записи.

ОБНОВЛЕНИЕ:

public DynamoDbMetaDataStore dynamoDbMetaDataStore() {
    String url = consumerClientProperties.getDynamoDB().getUrl();
    final AmazonDynamoDBAsync amazonDynamoDB = AmazonDynamoDBAsyncClientBuilder.standard()
        .withEndpointConfiguration(new EndpointConfiguration(
            url,
            Regions.fromName(awsRegion).getName()))
        .withClientConfiguration(new ClientConfiguration()
            .withMaxErrorRetry(consumerClientProperties.getDynamoDB().getRetries())
            .withConnectionTimeout(consumerClientProperties.getDynamoDB().getConnectionTimeout())).build();
    DynamoDbMetaDataStore dynamoDbMetaDataStore = new DynamoDbMetaDataStore(amazonDynamoDB, "consumer-test");
    return dynamoDbMetaDataStore;
  }

  public KinesisMessageDrivenChannelAdapter kinesisInboundChannel(
      AmazonKinesis amazonKinesis, String[] streamNames) {
    KinesisMessageDrivenChannelAdapter adapter =
        new KinesisMessageDrivenChannelAdapter(amazonKinesis, streamNames);
    adapter.setConverter(null);
    adapter.setOutputChannel(kinesisReceiveChannel());
    adapter.setCheckpointStore(dynamoDbMetaDataStore());
    adapter.setConsumerGroup(consumerClientProperties.getName());
    adapter.setCheckpointMode(CheckpointMode.manual);
    adapter.setListenerMode(ListenerMode.record);
    adapter.setStartTimeout(10000);
    adapter.setDescribeStreamRetries(1);
    adapter.setConcurrency(10);
    return adapter;
  }

Спасибо

1 Ответ

0 голосов
/ 25 июня 2018

Я рекомендую вам протестировать ваше решение с самой последней 2.0.0.BUILD-SNAPSHOT.

. Уже есть такая опция, как:

/**
 * Specify a {@link LockRegistry} for an exclusive access to provided streams.
 * This is not used when shards-based configuration is provided.
 * @param lockRegistry the {@link LockRegistry} to use.
 * @since 2.0
 */
public void setLockRegistry(LockRegistry lockRegistry) {

, где вам необходимо ввести DynamoDbLockRegistry дляЛучшее управление контрольными точками.

Для этой цели вам также необходимо добавить эту зависимость:

compile("com.amazonaws:dynamodb-lock-client:1.0.0")

В действительности, в этой M2 могут быть некоторые проблемы с фильтрацией ...

...