Я использую приложение Grails 3, чтобы поместить запись в AWS поток кинезиса. PutRecordResult и сборка потребителя работают нормально. Теперь я хочу поместить запись в определенный c сегмент в моем потоке и создать потребителя только для этого фрагмента. Ниже приведен мой код для putRecords, в котором я использовал ключ ключа для получения указанного c шарда.
def pushRecordIntoKinesisStream (def kinesis Json) {
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
clientBuilder.setRegion("us-east-1");
clientBuilder.setCredentials(new AWSStaticCredentialsProvider(credentials));
AmazonKinesis kinesisClient = clientBuilder.build();
com.amazonaws.services.kinesis.model.PutRecordRequest putRecordRequest = new com.amazonaws.services.kinesis.model.PutRecordRequest();
putRecordRequest.setStreamName("MyStream");
def dateTime=new Date().format("MM/dd/yyyy HH:mm:ss")
putRecordRequest.setPartitionKey("myPartitionKey");
putRecordRequest.withData(ByteBuffer.wrap(kinesisJson?.toString()?.getBytes()));
com.amazonaws.services.kinesis.model.PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
}
Это приведет к записи в указанный c осколок каждый раз. Допустим, shardId это "shardMyId"
Для построения клиента вот мой код:
// Set region
Region region = AwsClientUtil.buildRegion(config, config[SERVICE_NAME])
// Create client
AWSCredentialsProvider credentials = new DefaultAWSCredentialsProviderChain()
ClientConfiguration configuration = AwsClientUtil.buildClientConfiguration(config, config[SERVICE_NAME])
kinesis = new AmazonKinesisClient(credentials, configuration)
.withRegion(region)
workerId = "${InetAddress.localHost.canonicalHostName}:${UUID.randomUUID()}"
log.debug("Using Kinesis worker id: ${workerId}")
// Configure a Kinesis client for each stream
String clientAppName = "myStream"
String clientStreamName = "myStream"
kclConfig = new KinesisClientLibConfiguration(clientAppName, clientStreamName, credentials, workerId)
.withCommonClientConfig(configuration)
.withRegionName(region.name)
.withInitialPositionInStream(InitialPositionInStream.LATEST)
.withIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis)
// Create a new worker for each stream
Worker worker = new Worker(recordProcessorFactory, kclConfig)
try {
log.debug("Starting Kinesis worker for ${clientStreamName}")
executor = Executors.newSingleThreadExecutor()
executor.execute(worker)
} catch (Throwable t) {
log.error "Caught throwable while processing Kinesis data.", t
}
Есть ли способ настроить мой клиент так, чтобы он слушал только спецификацию c осколок?