Kinesis 2.2.11 java не может создать потребителя - PullRequest
1 голос
/ 06 августа 2020

Мне нужно перейти на библиотеку Kinesis до версии 2.2.11, поэтому я следовал руководству: https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html

Мне нужно запустить несколько экземпляров моего потребительского приложения, поэтому каждый из них необходимо иметь уникальное имя приложения, чтобы иметь отдельную таблицу аренды в DynamoDb. При инициализации потребителя Kinesis запускает DynamoDBLeaseRefresher.createLeaseTableIfNotExists, который проверяет, нужно ли создать новую таблицу для этого имени приложения, и создает ее, если ее не удается найти. Итак, выполняются 2 операции:

  1. DescribeTable - возвращает информацию о таблице или генерирует ResourceNotFoundExecption,
  2. при необходимости - CreateTable.

Проблема для me использует метод DescribeTable. Когда я ищу существующую таблицу, она без проблем возвращает ее. Но когда я ищу несуществующую таблицу, она выдает ResourceNotFoundExecption -> пока все хорошо. К сожалению, затем он упаковывается и теперь:

java .util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: невозможно выполнить HTTP-запрос: software.amazon. awssdk.awscore.exception.AwsServiceException $ Builder.extendedRequestId (Ljava / lang / String;) Lsoftware / amazon / awssdk / awscore / exception / AwsServiceException $ Builder;

и приложение, ожидающее ResourceNotFoundException, получает другое вместо этого и вылетает. Обернутое сообщение об исключении немного вводит в заблуждение: «Невозможно выполнить HTTP-запрос», так как запрос был выполнен и вернул правильное сообщение: «Ресурс не найден».

Забавно то, что иногда это срабатывает, исключение делает не обертываются, операция CreateTable выполняется, и потребитель запускается правильно.

Я сделал обходной путь для этого, где я просто создаю таблицу перед инициализацией LeaseCoordinator, поэтому он всегда получает существующую таблицу .

вот мой код:

public KinesisStreamReaderService(String streamName, String applicationName, String regionName) {

KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
      .credentialsProvider(EnvironmentVariableCredentialsProvider.create())
      .region(Region.of(connectionProperties.getRegion()))
      .httpClientBuilder(createHttpClientBuilder())
      .build();

DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(Region.of(regionName)).build();
    CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(Region.of(regionName)).build();

  //  if(!dynamoDbTableExists(dynamoClient, applicationName)) {
  //    createDynamoDbTable(dynamoClient, applicationName);
  //  }

    ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient,
      dynamoClient, cloudWatchClient, workerId(), KinesisReaderProcessor::new);
    configsBuilder.retrievalConfig().initialPositionInStreamExtended(
      InitialPositionInStreamExtended.newInitialPosition(
        InitialPositionInStream.LATEST));

    scheduler = new Scheduler(
      configsBuilder.checkpointConfig(),
      configsBuilder.coordinatorConfig(),
      configsBuilder.leaseManagementConfig(),
      configsBuilder.lifecycleConfig(),
      configsBuilder.metricsConfig(),
      configsBuilder.processorConfig(),
      configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
    );
  }

  private void createDynamoDbTable(DynamoDbAsyncClient dynamoClient, String applicationName) {
    log.info("Creating new lease table: {}", applicationName);
    CompletableFuture<CreateTableResponse> createTableFuture = dynamoClient
      .createTable(CreateTableRequest.builder()
        .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(10L).writeCapacityUnits(10L).build())
        .tableName(applicationName)
        .keySchema(KeySchemaElement.builder().attributeName("leaseKey").keyType(KeyType.HASH).build())
        .attributeDefinitions(AttributeDefinition.builder().attributeName("leaseKey").attributeType(
          ScalarAttributeType.S).build())
        .build());
    try {
      CreateTableResponse createTableResponse = createTableFuture.get();
      log.debug("Created new lease table: {}", createTableResponse.tableDescription().tableName());
    } catch (InterruptedException | ExecutionException e) {
      throw new DataStreamException(e.getMessage(), e);
    }
  }

  private boolean dynamoDbTableExists(DynamoDbAsyncClient dynamoClient, String tableName) {
    CompletableFuture<DescribeTableResponse> describeTableResponseCompletableFutureNew = dynamoClient
      .describeTable(DescribeTableRequest.builder()
        .tableName(tableName).build());
    try {
      DescribeTableResponse describeTableResponseNew = describeTableResponseCompletableFutureNew
        .get();
      return nonNull(describeTableResponseNew);
    } catch (InterruptedException | ExecutionException e) {
      log.info(e.getMessage(), e);
    }
    return false;
  }

  private static String workerId() {
    String workerId;
    try {
      workerId = format("%s_%s", getLocalHost().getCanonicalHostName(), randomUUID().toString());
    } catch (UnknownHostException e) {
      workerId = randomUUID().toString();
    }
    return workerId;
  }

  @Override
  public void read(Consumer<String> consumer) {
    this.consumer = consumer;
    scheduler.run();
  }

  private class KinesisReaderProcessor implements ShardRecordProcessor {

    private String shardId;

    @Override
    public void initialize(InitializationInput initializationInput) {
      this.shardId = initializationInput.shardId();
      log.info("Initializing record processor for shard: {}", shardId);
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
      log.debug("Checking shard {} for new records", shardId);
      List<KinesisClientRecord> records = processRecordsInput.records();
      if (!records.isEmpty()) {
        log.debug("Processing {} records from kinesis stream shard {}", records.size(), shardId);
        records.forEach(record -> {
          String json = UTF_8.decode(record.data()).toString();
          log.info(json);
          consumer.accept(json);
        });
      }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
      log.info("Record processor has lost lease, terminating");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
      try {
        shardEndedInput.checkpointer().checkpoint();
      } catch (ShutdownException | InvalidStateException e) {
        log.error(e.getMessage(), e);
      }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
      try {
        shutdownRequestedInput.checkpointer().checkpoint();
      } catch (ShutdownException | InvalidStateException e) {
        log.error(e.getMessage(), e);
      }
    }

  }

}

Мне не хватает какой-то конфигурации для планировщика или чего-то такого? Почему иногда это работает? Спасибо

Изменить: проблема в том, что этот блок кода в DynamoDBLeaseRefresher.tableStatus () вызывается, чтобы проверить, существует ли таблица:

DescribeTableResponse result;
    try {
      try {
        result = 
 (DescribeTableResponse)FutureUtils.resolveOrCancelFuture(this.dynamoDBClient.describeTable(request), this.dynamoDbRequestTimeout);
      } catch (ExecutionException var5) {
        throw exceptionManager.apply(var5.getCause());
      } catch (InterruptedException var6) {
        throw new DependencyException(var6);
      }
    } catch (ResourceNotFoundException var7) {
      log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", this.table);
      return null;
    }

, и в моем случае он должен получить ResourceNotFoundException, если таблица не найдена, но, как я уже сказал, исключение переносится в CompletionException, прежде чем достигнет соответствующего блока catch и будет перехвачено здесь кодом:

catch (ExecutionException var5) {
        throw exceptionManager.apply(var5.getCause());

Это происходит 20 раз в l oop при попытке инициализировать LeaseCoordinator, а затем просто прекращает попытки инициализировать соединение. (Как упоминалось выше, иногда это срабатывает, но это делает его еще более странным для меня). В моем обходном пути для инициализации требуется всего 1 попытка

1 Ответ

0 голосов
/ 11 августа 2020

Вам не нужно создавать таблицу аренды вручную - DynamoDBLeaseCoordinator создаст ее, если она не существует при инициализации, и подождет, пока она не появится:


    @Override
    public void initialize() throws ProvisionedThroughputException, DependencyException, IllegalStateException {
        final boolean newTableCreated =
                leaseRefresher.createLeaseTableIfNotExists(initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
        if (newTableCreated) {
            log.info("Created new lease table for coordinator with initial read capacity of {} and write capacity of {}.",
                    initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity);
        }
        // Need to wait for table in active state.
        final long secondsBetweenPolls = 10L;
        final long timeoutSeconds = 600L;
        final boolean isTableActive = leaseRefresher.waitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds);
        if (!isTableActive) {
            throw new DependencyException(new IllegalStateException("Creating table timeout"));
        }
    }

Проблема в вашем случае, я думаю, в том, что он в конечном итоге создается, и вам, вероятно, следует периодически проверять, пока не появится таблица - например, DynamoDBLeaseCoordinator#initialize().

...