Я работаю с потоками данных Amazon Kinesis. Мой поток Kinesis состоит только из одного осколка.
Я пытаюсь прочитать данные (записи) из потока после записи некоторых данных (записей) в тот же поток. Мои записи простые JSON.
Я могу видеть через консоль Amazon показания и записи.
Когда я пытаюсь напечатать содержимое записи с помощью «record.getData ()», я получаю эту ошибку:
java.nio.HeapByteBuffer[pos=4 lim=4 cap=4]
20:35:59.118 [RecordProcessor-0000] WARN com.kinesisconsumer.AmazonKinesisApplicationSampleRecordProcessor - Caught throwable while processing record UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49593662497507120518174908605360552573875197411355262978, getData()=java.nio.HeapByteBuffer[pos=4 lim=4 cap=4], getPartitionKey()=12345]
java.lang.StringIndexOutOfBoundsException: String index out of range: -9
at java.lang.String.substring(String.java:1931)
at com.kinesisconsumer.AmazonKinesisApplicationSampleRecordProcessor.processSingleRecord(AmazonKinesisApplicationSampleRecordProcessor.java:112)
at com.kinesisconsumer.AmazonKinesisApplicationSampleRecordProcessor.processRecordsWithRetries(AmazonKinesisApplicationSampleRecordProcessor.java:75)
at com.kinesisconsumer.AmazonKinesisApplicationSampleRecordProcessor.processRecords(AmazonKinesisApplicationSampleRecordProcessor.java:53)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Вот мой код:
public class AmazonKinesisApplicationRecordProcessorFactory implements IRecordProcessorFactory {
/**
* {@inheritDoc}
*/
@Override
public IRecordProcessor createProcessor() {
return new AmazonKinesisApplicationSampleRecordProcessor();
}
}
public final class AmazonKinesisApplicationSample {
public static final String SAMPLE_APPLICATION_STREAM_NAME = "LimorKinesis";
private static final String SAMPLE_APPLICATION_NAME = "SampleKinesisApplication";
// Initial position in the stream when the application starts up for the first time.
// Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
private static final InitialPositionInStream SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM =
InitialPositionInStream.LATEST;
private static ProfileCredentialsProvider credentialsProvider;
private static void init() {
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
/*
* The ProfileCredentialsProvider will return your [default]
* credential profile by reading from the credentials file located at
* (~/.aws/credentials).
*/
credentialsProvider = new ProfileCredentialsProvider();
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
+ "Please make sure that your credentials file is at the correct "
+ "location (~/.aws/credentials), and is in valid format.", e);
}
}
public static void main(String[] args) throws Exception {
init();
if (args.length == 1 && "delete-resources".equals(args[0])) {
deleteResources();
return;
}
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
KinesisClientLibConfiguration kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(SAMPLE_APPLICATION_NAME,
SAMPLE_APPLICATION_STREAM_NAME,
credentialsProvider,
workerId);
kinesisClientLibConfiguration.withInitialPositionInStream(SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM);
kinesisClientLibConfiguration.withRegionName("us-west-2");//todo : added region west-2
IRecordProcessorFactory recordProcessorFactory = new AmazonKinesisApplicationRecordProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
System.out.printf("Running %s to process stream %s as worker %s...\n",
SAMPLE_APPLICATION_NAME,
SAMPLE_APPLICATION_STREAM_NAME,
workerId);
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
System.err.println("Caught throwable while processing data.");
t.printStackTrace();
exitCode = 1;
}
System.exit(exitCode);
}
public static void deleteResources() {
// Delete the stream
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion("us-west-2")
.build();
System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n",
SAMPLE_APPLICATION_STREAM_NAME);
try {
kinesis.deleteStream(SAMPLE_APPLICATION_STREAM_NAME);
} catch (ResourceNotFoundException ex) {
// The stream doesn't exist.
}
// Delete the table
AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion("us-west-2")
.build();
System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n",
SAMPLE_APPLICATION_NAME);
try {
dynamoDB.deleteTable(SAMPLE_APPLICATION_NAME);
} catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) {
// The table doesn't exist.
}
}
}
public class AmazonKinesisApplicationSampleRecordProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(AmazonKinesisApplicationSampleRecordProcessor.class);
private String kinesisShardId;
// Backoff and retry settings
private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
private static final int NUM_RETRIES = 10;
// Checkpoint about once a minute
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
LOG.info("Initializing record processor for shard: " + shardId);
this.kinesisShardId = shardId;
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
LOG.info("Processing " + records.size() + " records from " + kinesisShardId);
// Process records and perform all exception handling.
processRecordsWithRetries(records);
// Checkpoint once every checkpoint interval.
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
/**
* Process records performing retries as needed. Skip "poison pill" records.
*
* @param records Data records to be processed.
*/
private void processRecordsWithRetries(List<Record> records) {
for (Record record : records) {
boolean processedSuccessfully = false;
for (int i = 0; i < NUM_RETRIES; i++) {
try {
//
// Logic to process record goes here.
//
processSingleRecord(record);
processedSuccessfully = true;
break;
} catch (Throwable t) {
LOG.warn("Caught throwable while processing record " + record, t);
}
// backoff if we encounter an exception.
try {
Thread.sleep(BACKOFF_TIME_IN_MILLIS);
} catch (InterruptedException e) {
LOG.debug("Interrupted sleep", e);
}
}
if (!processedSuccessfully) {
LOG.error("Couldn't process record " + record + ". Skipping the record.");
}
}
}
/**
* Process a single record.
*
* @param record The record to be processed.
*/
private void processSingleRecord(Record record) {
System.out.println(record.getData());
String data = null;
try {
// For this app, we interpret the payload as UTF-8 chars.
data = decoder.decode(record.getData()).toString();
// Assume this record came from AmazonKinesisSample and log its age.
long recordCreateTime = new Long(data.substring("testData-".length()));
long ageOfRecordInMillis = System.currentTimeMillis() - recordCreateTime;
LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data + ", Created "
+ ageOfRecordInMillis + " milliseconds ago.");
} catch (NumberFormatException e) {
LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
} catch (CharacterCodingException e) {
LOG.error("Malformed data: " + data, e);
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
LOG.info("Shutting down record processor for shard: " + kinesisShardId);
// Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
/** Checkpoint with retries.
* @param checkpointer
*/
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
LOG.info("Checkpointing shard " + kinesisShardId);
for (int i = 0; i < NUM_RETRIES; i++) {
try {
checkpointer.checkpoint();
break;
} catch (ShutdownException se) {
// Ignore checkpoint if the processor instance has been shutdown (fail over).
LOG.info("Caught shutdown exception, skipping checkpoint.", se);
break;
} catch (ThrottlingException e) {
// Backoff and re-attempt checkpoint upon transient failures
if (i >= (NUM_RETRIES - 1)) {
LOG.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
break;
} else {
LOG.info("Transient issue when checkpointing - attempt " + (i + 1) + " of "
+ NUM_RETRIES, e);
}
} catch (InvalidStateException e) {
// This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
break;
}
try {
Thread.sleep(BACKOFF_TIME_IN_MILLIS);
} catch (InterruptedException e) {
LOG.debug("Interrupted sleep", e);
}
}
}
}
public class AmazonKinesisRecordProducerSample {
private static AmazonKinesis kinesis;
private static void init() throws Exception {
/*
* The ProfileCredentialsProvider will return your [default]
* credential profile by reading from the credentials file located at
* (~/.aws/credentials).
*/
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider();
try {
credentialsProvider.getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
e);
}
kinesis = AmazonKinesisClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion("us-west-2")
.build();
}
public static void main(String[] args) throws Exception {
init();
final String myStreamName = AmazonKinesisApplicationSample.SAMPLE_APPLICATION_STREAM_NAME;
final Integer myStreamSize = 1;
// Describe the stream and check if it exists.
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(myStreamName);
try {
StreamDescription streamDescription = kinesis.describeStream(describeStreamRequest).getStreamDescription();
System.out.printf("Stream %s has a status of %s.\n", myStreamName, streamDescription.getStreamStatus());
if ("DELETING".equals(streamDescription.getStreamStatus())) {
System.out.println("Stream is being deleted. This sample will now exit.");
System.exit(0);
}
// Wait for the stream to become active if it is not yet ACTIVE.
if (!"ACTIVE".equals(streamDescription.getStreamStatus())) {
waitForStreamToBecomeAvailable(myStreamName);
}
} catch (ResourceNotFoundException ex) {
System.out.printf("Stream %s does not exist. Creating it now.\n", myStreamName);
// Create a stream. The number of shards determines the provisioned throughput.
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
createStreamRequest.setStreamName(myStreamName);
createStreamRequest.setShardCount(myStreamSize);
kinesis.createStream(createStreamRequest);
// The stream is now being created. Wait for it to become active.
waitForStreamToBecomeAvailable(myStreamName);
}
// List all of my streams.
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
listStreamsRequest.setLimit(10);
ListStreamsResult listStreamsResult = kinesis.listStreams(listStreamsRequest);
List<String> streamNames = listStreamsResult.getStreamNames();
while (listStreamsResult.isHasMoreStreams()) {
if (streamNames.size() > 0) {
listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1));
}
listStreamsResult = kinesis.listStreams(listStreamsRequest);
streamNames.addAll(listStreamsResult.getStreamNames());
}
// Print all of my streams.
System.out.println("List of my streams: ");
for (int i = 0; i < streamNames.size(); i++) {
System.out.println("\t- " + streamNames.get(i));
}
System.out.printf("Putting records in stream : %s until this application is stopped...\n", myStreamName);
System.out.println("Press CTRL-C to stop.");
// Write records to the stream until this program is aborted.
while (true) {
long createTime = System.currentTimeMillis();
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
putRecordRequest.setData(ByteBuffer.wrap(String.format("testData-%d", createTime).getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));
PutRecordResult putRecordResult = kinesis.putRecord(putRecordRequest);
System.out.printf("Successfully put record, partition key : %s, ShardID : %s, SequenceNumber : %s.\n",
putRecordRequest.getPartitionKey(),
putRecordResult.getShardId(),
putRecordResult.getSequenceNumber());
}
}
private static void waitForStreamToBecomeAvailable(String myStreamName) throws InterruptedException {
System.out.printf("Waiting for %s to become ACTIVE...\n", myStreamName);
long startTime = System.currentTimeMillis();
long endTime = startTime + TimeUnit.MINUTES.toMillis(10);
while (System.currentTimeMillis() < endTime) {
Thread.sleep(TimeUnit.SECONDS.toMillis(20));
try {
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(myStreamName);
// ask for no more than 10 shards at a time -- this is an optional parameter
describeStreamRequest.setLimit(10);
DescribeStreamResult describeStreamResponse = kinesis.describeStream(describeStreamRequest);
String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
System.out.printf("\t- current state: %s\n", streamStatus);
if ("ACTIVE".equals(streamStatus)) {
return;
}
} catch (ResourceNotFoundException ex) {
// ResourceNotFound means the stream doesn't exist yet,
// so ignore this error and just keep polling.
} catch (AmazonServiceException ase) {
throw ase;
}
}
throw new RuntimeException(String.format("Stream %s never became active", myStreamName));
}
}
Я использовал пример кода по этой ссылке:
https://github.com/aws/aws-sdk-java/tree/master/src/samples/AmazonKinesis