Получение исключения по кинезу при попытке поставить записи - PullRequest
0 голосов
/ 03 февраля 2020

Я следую кодам, предоставленным AWS, чтобы записать некоторые данные в кинезис из RDD в виде искры следующим образом:

int colname=0;

for (String item: items)
{
    String key=String.valueOf(colname);
    colname++;

    ByteBuffer data = ByteBuffer.wrap(item.getBytes("UTF-8"));
    // doesn't block       

    System.out.println(String.format("Putting the item=[%s] with key=[%s]",item,key));
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(data);
    putRecordsRequestEntry.setPartitionKey(key);
    putRecordsRequestEntryList.add(putRecordsRequestEntry);


}


}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);

Части кода, которые я использую для авторизации, указаны ниже:

ClientConfiguration config=new ClientConfiguration().withMaxErrorRetry(30)
                        .withConnectionTimeout(100*1000).withSocketTimeout(100*1000);


BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("****", "****");
AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard()//.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
               .withRegion(Regions.US_EAST_1).withClientConfiguration(config).build();

Так как я работаю на EMR, я не использую AWS учетные данные, однако я получаю исключение в своей искре:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 0.0 failed 4 times, most recent failure: Lost task 35.3 in stage 0.0 (TID 53, ip-172-31-27-87.ec2.internal, executor 9): com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation error detected: Value '[PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=9 cap=9], explicitHashKey=null, partitionKey=0), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=6 cap=6], explicitHashKey=null,com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation error detected: Value '[PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=9 cap=9], explicitHashKey=null, partitionKey=0), partitionKey=1), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=8 cap=8], explicitHashKey=null, partitionKey=2), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], explicitHashKey=null, partitionKey=3), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11], explicitHashKey=null, partitionKey=4), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11], explicitHashKey=null, partitionKey=5), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=6 cap=6], explicitHashKey=null, partitionKey=6), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13], explicitHashKey=null, partitionKey=7), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11], explicitHashKey=null, partitionKey=8),
...