Мне нужно перейти на библиотеку Kinesis до версии 2.2.11, поэтому я следовал руководству: https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html
Мне нужно запустить несколько экземпляров моего потребительского приложения, поэтому каждый из них необходимо иметь уникальное имя приложения, чтобы иметь отдельную таблицу аренды в DynamoDb. При инициализации потребителя Kinesis запускает DynamoDBLeaseRefresher.createLeaseTableIfNotExists, который проверяет, нужно ли создать новую таблицу для этого имени приложения, и создает ее, если ее не удается найти. Итак, выполняются 2 операции:
- DescribeTable - возвращает информацию о таблице или генерирует ResourceNotFoundExecption,
- при необходимости - CreateTable.
Проблема для me использует метод DescribeTable. Когда я ищу существующую таблицу, она без проблем возвращает ее. Но когда я ищу несуществующую таблицу, она выдает ResourceNotFoundExecption -> пока все хорошо. К сожалению, затем он упаковывается и теперь:
java .util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: невозможно выполнить HTTP-запрос: software.amazon. awssdk.awscore.exception.AwsServiceException $ Builder.extendedRequestId (Ljava / lang / String;) Lsoftware / amazon / awssdk / awscore / exception / AwsServiceException $ Builder;
и приложение, ожидающее ResourceNotFoundException, получает другое вместо этого и вылетает. Обернутое сообщение об исключении немного вводит в заблуждение: «Невозможно выполнить HTTP-запрос», так как запрос был выполнен и вернул правильное сообщение: «Ресурс не найден».
Забавно то, что иногда это срабатывает, исключение делает не обертываются, операция CreateTable выполняется, и потребитель запускается правильно.
Я сделал обходной путь для этого, где я просто создаю таблицу перед инициализацией LeaseCoordinator, поэтому он всегда получает существующую таблицу .
вот мой код:
public KinesisStreamReaderService(String streamName, String applicationName, String regionName) {
KinesisAsyncClient kinesisClient = KinesisAsyncClient.builder()
.credentialsProvider(EnvironmentVariableCredentialsProvider.create())
.region(Region.of(connectionProperties.getRegion()))
.httpClientBuilder(createHttpClientBuilder())
.build();
DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(Region.of(regionName)).build();
CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(Region.of(regionName)).build();
// if(!dynamoDbTableExists(dynamoClient, applicationName)) {
// createDynamoDbTable(dynamoClient, applicationName);
// }
ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient,
dynamoClient, cloudWatchClient, workerId(), KinesisReaderProcessor::new);
configsBuilder.retrievalConfig().initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(
InitialPositionInStream.LATEST));
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient))
);
}
private void createDynamoDbTable(DynamoDbAsyncClient dynamoClient, String applicationName) {
log.info("Creating new lease table: {}", applicationName);
CompletableFuture<CreateTableResponse> createTableFuture = dynamoClient
.createTable(CreateTableRequest.builder()
.provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(10L).writeCapacityUnits(10L).build())
.tableName(applicationName)
.keySchema(KeySchemaElement.builder().attributeName("leaseKey").keyType(KeyType.HASH).build())
.attributeDefinitions(AttributeDefinition.builder().attributeName("leaseKey").attributeType(
ScalarAttributeType.S).build())
.build());
try {
CreateTableResponse createTableResponse = createTableFuture.get();
log.debug("Created new lease table: {}", createTableResponse.tableDescription().tableName());
} catch (InterruptedException | ExecutionException e) {
throw new DataStreamException(e.getMessage(), e);
}
}
private boolean dynamoDbTableExists(DynamoDbAsyncClient dynamoClient, String tableName) {
CompletableFuture<DescribeTableResponse> describeTableResponseCompletableFutureNew = dynamoClient
.describeTable(DescribeTableRequest.builder()
.tableName(tableName).build());
try {
DescribeTableResponse describeTableResponseNew = describeTableResponseCompletableFutureNew
.get();
return nonNull(describeTableResponseNew);
} catch (InterruptedException | ExecutionException e) {
log.info(e.getMessage(), e);
}
return false;
}
private static String workerId() {
String workerId;
try {
workerId = format("%s_%s", getLocalHost().getCanonicalHostName(), randomUUID().toString());
} catch (UnknownHostException e) {
workerId = randomUUID().toString();
}
return workerId;
}
@Override
public void read(Consumer<String> consumer) {
this.consumer = consumer;
scheduler.run();
}
private class KinesisReaderProcessor implements ShardRecordProcessor {
private String shardId;
@Override
public void initialize(InitializationInput initializationInput) {
this.shardId = initializationInput.shardId();
log.info("Initializing record processor for shard: {}", shardId);
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
log.debug("Checking shard {} for new records", shardId);
List<KinesisClientRecord> records = processRecordsInput.records();
if (!records.isEmpty()) {
log.debug("Processing {} records from kinesis stream shard {}", records.size(), shardId);
records.forEach(record -> {
String json = UTF_8.decode(record.data()).toString();
log.info(json);
consumer.accept(json);
});
}
}
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
log.info("Record processor has lost lease, terminating");
}
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error(e.getMessage(), e);
}
}
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error(e.getMessage(), e);
}
}
}
}
Мне не хватает какой-то конфигурации для планировщика или чего-то такого? Почему иногда это работает? Спасибо
Изменить: проблема в том, что этот блок кода в DynamoDBLeaseRefresher.tableStatus () вызывается, чтобы проверить, существует ли таблица:
DescribeTableResponse result;
try {
try {
result =
(DescribeTableResponse)FutureUtils.resolveOrCancelFuture(this.dynamoDBClient.describeTable(request), this.dynamoDbRequestTimeout);
} catch (ExecutionException var5) {
throw exceptionManager.apply(var5.getCause());
} catch (InterruptedException var6) {
throw new DependencyException(var6);
}
} catch (ResourceNotFoundException var7) {
log.debug("Got ResourceNotFoundException for table {} in leaseTableExists, returning false.", this.table);
return null;
}
, и в моем случае он должен получить ResourceNotFoundException, если таблица не найдена, но, как я уже сказал, исключение переносится в CompletionException, прежде чем достигнет соответствующего блока catch и будет перехвачено здесь кодом:
catch (ExecutionException var5) {
throw exceptionManager.apply(var5.getCause());
Это происходит 20 раз в l oop при попытке инициализировать LeaseCoordinator, а затем просто прекращает попытки инициализировать соединение. (Как упоминалось выше, иногда это срабатывает, но это делает его еще более странным для меня). В моем обходном пути для инициализации требуется всего 1 попытка