AWS Kinesis Настройка клиента для указания c шарда в потоке - PullRequest
0 голосов
/ 13 апреля 2020

Я использую приложение Grails 3, чтобы поместить запись в AWS поток кинезиса. PutRecordResult и сборка потребителя работают нормально. Теперь я хочу поместить запись в определенный c сегмент в моем потоке и создать потребителя только для этого фрагмента. Ниже приведен мой код для putRecords, в котором я использовал ключ ключа для получения указанного c шарда.

def pushRecordIntoKinesisStream (def kinesis Json) {

        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        clientBuilder.setRegion("us-east-1");
        clientBuilder.setCredentials(new AWSStaticCredentialsProvider(credentials));
        AmazonKinesis kinesisClient = clientBuilder.build();
        com.amazonaws.services.kinesis.model.PutRecordRequest putRecordRequest = new com.amazonaws.services.kinesis.model.PutRecordRequest();
        putRecordRequest.setStreamName("MyStream");
        def dateTime=new Date().format("MM/dd/yyyy HH:mm:ss")
        putRecordRequest.setPartitionKey("myPartitionKey");
        putRecordRequest.withData(ByteBuffer.wrap(kinesisJson?.toString()?.getBytes()));
        com.amazonaws.services.kinesis.model.PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
    } 

Это приведет к записи в указанный c осколок каждый раз. Допустим, shardId это "shardMyId"

Для построения клиента вот мой код:

// Set region
        Region region = AwsClientUtil.buildRegion(config, config[SERVICE_NAME])

    // Create client
    AWSCredentialsProvider credentials = new DefaultAWSCredentialsProviderChain()
    ClientConfiguration configuration = AwsClientUtil.buildClientConfiguration(config, config[SERVICE_NAME])
    kinesis = new AmazonKinesisClient(credentials, configuration)
            .withRegion(region)

    workerId = "${InetAddress.localHost.canonicalHostName}:${UUID.randomUUID()}"
    log.debug("Using Kinesis worker id: ${workerId}")

    // Configure a Kinesis client for each stream
    String clientAppName = "myStream" 
    String clientStreamName = "myStream"
    kclConfig = new KinesisClientLibConfiguration(clientAppName, clientStreamName, credentials, workerId)
            .withCommonClientConfig(configuration)
            .withRegionName(region.name)
            .withInitialPositionInStream(InitialPositionInStream.LATEST)
            .withIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis)

    // Create a new worker for each stream
    Worker worker = new Worker(recordProcessorFactory, kclConfig)
    try {
        log.debug("Starting Kinesis worker for ${clientStreamName}")
        executor = Executors.newSingleThreadExecutor()
        executor.execute(worker)
    } catch (Throwable t) {
        log.error "Caught throwable while processing Kinesis data.", t
    }

Есть ли способ настроить мой клиент так, чтобы он слушал только спецификацию c осколок?

...