Я работаю с потоками данных Amazon Kinesis. Мой поток Kinesis состоит только из одного осколка.
Я пытаюсь прочитать данные (записи) из потока после записи некоторых данных (записей) в тот же поток. Мои записи простые JSON.
Я могу видеть через консоль Amazon показания и записи.
Когда я пытаюсь напечатать содержимое записи с помощью «record.getData ()», я получаю эту ошибку:
java.nio.HeapByteBuffer[pos=4 lim=4 cap=4]
20:35:59.118 [RecordProcessor-0000] WARN com.kinesisconsumer.AmazonKinesisApplicationSampleRecordProcessor - Caught throwable while processing record UserRecord [subSequenceNumber=0, explicitHashKey=null, aggregated=false, getSequenceNumber()=49593662497507120518174908605360552573875197411355262978, getData()=java.nio.HeapByteBuffer[pos=4 lim=4 cap=4], getPartitionKey()=12345]
java.lang.StringIndexOutOfBoundsException: String index out of range: -9
at java.lang.String.substring(String.java:1931)
at com.kinesisconsumer.AmazonKinesisApplicationSampleRecordProcessor.processSingleRecord(AmazonKinesisApplicationSampleRecordProcessor.java:112)
at com.kinesisconsumer.AmazonKinesisApplicationSampleRecordProcessor.processRecordsWithRetries(AmazonKinesisApplicationSampleRecordProcessor.java:75)
at com.kinesisconsumer.AmazonKinesisApplicationSampleRecordProcessor.processRecords(AmazonKinesisApplicationSampleRecordProcessor.java:53)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.V1ToV2RecordProcessorAdapter.processRecords(V1ToV2RecordProcessorAdapter.java:42)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.callProcessRecords(ProcessTask.java:221)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:176)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Вот мой код:
public class AmazonKinesisApplicationRecordProcessorFactory implements IRecordProcessorFactory {
* {@inheritDoc}
public IRecordProcessor createProcessor() {
return new AmazonKinesisApplicationSampleRecordProcessor();
public final class AmazonKinesisApplicationSample {
public static final String SAMPLE_APPLICATION_STREAM_NAME = "LimorKinesis";
private static final String SAMPLE_APPLICATION_NAME = "SampleKinesisApplication";
// Initial position in the stream when the application starts up for the first time.
// Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
private static final InitialPositionInStream SAMPLE_APPLICATION_INITIAL_POSITION_IN_STREAM =
private static ProfileCredentialsProvider credentialsProvider;
private static void init() {
// Ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl", "60");
* The ProfileCredentialsProvider will return your [default]
* credential profile by reading from the credentials file located at
* (~/.aws/credentials).
credentialsProvider = new ProfileCredentialsProvider();
try {
} catch (Exception e) {
throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
+ "Please make sure that your credentials file is at the correct "
+ "location (~/.aws/credentials), and is in valid format.", e);
public static void main(String[] args) throws Exception {
if (args.length == 1 && "delete-resources".equals(args[0])) {
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
KinesisClientLibConfiguration kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(SAMPLE_APPLICATION_NAME,
kinesisClientLibConfiguration.withRegionName("us-west-2");//todo : added region west-2
IRecordProcessorFactory recordProcessorFactory = new AmazonKinesisApplicationRecordProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
System.out.printf("Running %s to process stream %s as worker %s...\n",
int exitCode = 0;
try {
} catch (Throwable t) {
System.err.println("Caught throwable while processing data.");
exitCode = 1;
public static void deleteResources() {
// Delete the stream
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
System.out.printf("Deleting the Amazon Kinesis stream used by the sample. Stream Name = %s.\n",
try {
} catch (ResourceNotFoundException ex) {
// The stream doesn't exist.
// Delete the table
AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.standard()
System.out.printf("Deleting the Amazon DynamoDB table used by the Amazon Kinesis Client Library. Table Name = %s.\n",
try {
} catch (com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException ex) {
// The table doesn't exist.
public class AmazonKinesisApplicationSampleRecordProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(AmazonKinesisApplicationSampleRecordProcessor.class);
private String kinesisShardId;
// Backoff and retry settings
private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
private static final int NUM_RETRIES = 10;
// Checkpoint about once a minute
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
* {@inheritDoc}
public void initialize(String shardId) {
LOG.info("Initializing record processor for shard: " + shardId);
this.kinesisShardId = shardId;
* {@inheritDoc}
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
LOG.info("Processing " + records.size() + " records from " + kinesisShardId);
// Process records and perform all exception handling.
// Checkpoint once every checkpoint interval.
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
* Process records performing retries as needed. Skip "poison pill" records.
* @param records Data records to be processed.
private void processRecordsWithRetries(List<Record> records) {
for (Record record : records) {
boolean processedSuccessfully = false;
for (int i = 0; i < NUM_RETRIES; i++) {
try {
// Logic to process record goes here.
processedSuccessfully = true;
} catch (Throwable t) {
LOG.warn("Caught throwable while processing record " + record, t);
// backoff if we encounter an exception.
try {
} catch (InterruptedException e) {
LOG.debug("Interrupted sleep", e);
if (!processedSuccessfully) {
LOG.error("Couldn't process record " + record + ". Skipping the record.");
* Process a single record.
* @param record The record to be processed.
private void processSingleRecord(Record record) {
String data = null;
try {
// For this app, we interpret the payload as UTF-8 chars.
data = decoder.decode(record.getData()).toString();
// Assume this record came from AmazonKinesisSample and log its age.
long recordCreateTime = new Long(data.substring("testData-".length()));
long ageOfRecordInMillis = System.currentTimeMillis() - recordCreateTime;
LOG.info(record.getSequenceNumber() + ", " + record.getPartitionKey() + ", " + data + ", Created "
+ ageOfRecordInMillis + " milliseconds ago.");
} catch (NumberFormatException e) {
LOG.info("Record does not match sample record format. Ignoring record with data; " + data);
} catch (CharacterCodingException e) {
LOG.error("Malformed data: " + data, e);
* {@inheritDoc}
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
LOG.info("Shutting down record processor for shard: " + kinesisShardId);
// Important to checkpoint after reaching end of shard, so we can start processing data from child shards.
if (reason == ShutdownReason.TERMINATE) {
/** Checkpoint with retries.
* @param checkpointer
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
LOG.info("Checkpointing shard " + kinesisShardId);
for (int i = 0; i < NUM_RETRIES; i++) {
try {
} catch (ShutdownException se) {
// Ignore checkpoint if the processor instance has been shutdown (fail over).
LOG.info("Caught shutdown exception, skipping checkpoint.", se);
} catch (ThrottlingException e) {
// Backoff and re-attempt checkpoint upon transient failures
if (i >= (NUM_RETRIES - 1)) {
LOG.error("Checkpoint failed after " + (i + 1) + "attempts.", e);
} else {
LOG.info("Transient issue when checkpointing - attempt " + (i + 1) + " of "
} catch (InvalidStateException e) {
// This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
LOG.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e);
try {
} catch (InterruptedException e) {
LOG.debug("Interrupted sleep", e);
public class AmazonKinesisRecordProducerSample {
private static AmazonKinesis kinesis;
private static void init() throws Exception {
* The ProfileCredentialsProvider will return your [default]
* credential profile by reading from the credentials file located at
* (~/.aws/credentials).
ProfileCredentialsProvider credentialsProvider = new ProfileCredentialsProvider();
try {
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load the credentials from the credential profiles file. " +
"Please make sure that your credentials file is at the correct " +
"location (~/.aws/credentials), and is in valid format.",
kinesis = AmazonKinesisClientBuilder.standard()
public static void main(String[] args) throws Exception {
final String myStreamName = AmazonKinesisApplicationSample.SAMPLE_APPLICATION_STREAM_NAME;
final Integer myStreamSize = 1;
// Describe the stream and check if it exists.
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest().withStreamName(myStreamName);
try {
StreamDescription streamDescription = kinesis.describeStream(describeStreamRequest).getStreamDescription();
System.out.printf("Stream %s has a status of %s.\n", myStreamName, streamDescription.getStreamStatus());
if ("DELETING".equals(streamDescription.getStreamStatus())) {
System.out.println("Stream is being deleted. This sample will now exit.");
// Wait for the stream to become active if it is not yet ACTIVE.
if (!"ACTIVE".equals(streamDescription.getStreamStatus())) {
} catch (ResourceNotFoundException ex) {
System.out.printf("Stream %s does not exist. Creating it now.\n", myStreamName);
// Create a stream. The number of shards determines the provisioned throughput.
CreateStreamRequest createStreamRequest = new CreateStreamRequest();
// The stream is now being created. Wait for it to become active.
// List all of my streams.
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
ListStreamsResult listStreamsResult = kinesis.listStreams(listStreamsRequest);
List<String> streamNames = listStreamsResult.getStreamNames();
while (listStreamsResult.isHasMoreStreams()) {
if (streamNames.size() > 0) {
listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1));
listStreamsResult = kinesis.listStreams(listStreamsRequest);
// Print all of my streams.
System.out.println("List of my streams: ");
for (int i = 0; i < streamNames.size(); i++) {
System.out.println("\t- " + streamNames.get(i));
System.out.printf("Putting records in stream : %s until this application is stopped...\n", myStreamName);
System.out.println("Press CTRL-C to stop.");
// Write records to the stream until this program is aborted.
while (true) {
long createTime = System.currentTimeMillis();
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setData(ByteBuffer.wrap(String.format("testData-%d", createTime).getBytes()));
putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime));
PutRecordResult putRecordResult = kinesis.putRecord(putRecordRequest);
System.out.printf("Successfully put record, partition key : %s, ShardID : %s, SequenceNumber : %s.\n",
private static void waitForStreamToBecomeAvailable(String myStreamName) throws InterruptedException {
System.out.printf("Waiting for %s to become ACTIVE...\n", myStreamName);
long startTime = System.currentTimeMillis();
long endTime = startTime + TimeUnit.MINUTES.toMillis(10);
while (System.currentTimeMillis() < endTime) {
try {
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
// ask for no more than 10 shards at a time -- this is an optional parameter
DescribeStreamResult describeStreamResponse = kinesis.describeStream(describeStreamRequest);
String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
System.out.printf("\t- current state: %s\n", streamStatus);
if ("ACTIVE".equals(streamStatus)) {
} catch (ResourceNotFoundException ex) {
// ResourceNotFound means the stream doesn't exist yet,
// so ignore this error and just keep polling.
} catch (AmazonServiceException ase) {
throw ase;
throw new RuntimeException(String.format("Stream %s never became active", myStreamName));
Я использовал пример кода по этой ссылке: