Я следую кодам, предоставленным AWS, чтобы записать некоторые данные в кинезис из RDD в виде искры следующим образом:
int colname=0;
for (String item: items)
{
String key=String.valueOf(colname);
colname++;
ByteBuffer data = ByteBuffer.wrap(item.getBytes("UTF-8"));
// doesn't block
System.out.println(String.format("Putting the item=[%s] with key=[%s]",item,key));
PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
putRecordsRequestEntry.setData(data);
putRecordsRequestEntry.setPartitionKey(key);
putRecordsRequestEntryList.add(putRecordsRequestEntry);
}
}
putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
Части кода, которые я использую для авторизации, указаны ниже:
ClientConfiguration config=new ClientConfiguration().withMaxErrorRetry(30)
.withConnectionTimeout(100*1000).withSocketTimeout(100*1000);
BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials("****", "****");
AmazonKinesis kinesisClient = AmazonKinesisClientBuilder.standard()//.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withRegion(Regions.US_EAST_1).withClientConfiguration(config).build();
Так как я работаю на EMR, я не использую AWS учетные данные, однако я получаю исключение в своей искре:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 35 in stage 0.0 failed 4 times, most recent failure: Lost task 35.3 in stage 0.0 (TID 53, ip-172-31-27-87.ec2.internal, executor 9): com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation error detected: Value '[PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=9 cap=9], explicitHashKey=null, partitionKey=0), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=6 cap=6], explicitHashKey=null,com.amazonaws.services.kinesis.model.AmazonKinesisException: 1 validation error detected: Value '[PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=9 cap=9], explicitHashKey=null, partitionKey=0), partitionKey=1), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=8 cap=8], explicitHashKey=null, partitionKey=2), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], explicitHashKey=null, partitionKey=3), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11], explicitHashKey=null, partitionKey=4), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11], explicitHashKey=null, partitionKey=5), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=6 cap=6], explicitHashKey=null, partitionKey=6), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=13 cap=13], explicitHashKey=null, partitionKey=7), PutRecordsRequestEntry(data=java.nio.HeapByteBuffer[pos=0 lim=11 cap=11], explicitHashKey=null, partitionKey=8),