Я работаю над проектом, в котором я использую кинезис и потребляю его.
Я начал играть с примером кода Amazon, который я нашел, и я
с помощью AWS CLI подтвердил, что производитель работает должным образом (создает
данные порядкового номера (1), затем пробел, а затем символ "а" все
способ заполнить 128-байтовый блок).
Но потом, когда я пытаюсь использовать свои данные, кажется, что они не в состоянии
извлечь порядковый номер, который находится в начале данных
и по какой-то причине и ловит исключение «ошибка анализа записи» каждый раз.
Я хотел бы получить помощь в выяснении этого и получить больше советов о том, как правильно работать с Kinesis.
Вот соответствующий код:
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* http://aws.amazon.com/asl/
* or in the "license" file accompanying this file. This file is distributed
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
package com.kinesisdataproducer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.kinesisdataconsumer.SampleConsumer;
import com.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
* The Kinesis Producer Library (KPL) excels at handling large numbers of small
* logical records by combining multiple logical records into a single Kinesis
* record.
* <p>
* In this sample we'll be putting a monotonically increasing sequence number in
* each logical record, and then padding the record to 128 bytes long. The
* consumer will then check that all records are received correctly by verifying
* that there are no gaps in the sequence numbers.
* <p>
* We will distribute the records evenly across all shards by using a random
* explicit hash key.
* <p>
* To prevent the consumer from being confused by data from multiple runs of the
* producer, each record also carries the time at which the producer started.
* The consumer will reset its state whenever it detects a new, larger
* timestamp. We will place the timestamp in the partition key. This does not
* affect the random distribution of records across shards since we've set an
* explicit hash key.
* @see SampleConsumer
* @author chaodeng
public class SampleProducer {
private static final Logger log = LoggerFactory.getLogger(SampleProducer.class);
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(1);
* Timestamp we'll attach to every record
private static final String TIMESTAMP = Long.toString(System.currentTimeMillis());
* Change these to try larger or smaller records.
private static final int DATA_SIZE = 128;
* Put records for this number of seconds before exiting.
private static final int SECONDS_TO_RUN = 1;
* Put this number of records per second.
* Because multiple logical records are combined into each Kinesis record,
* even a single shard can handle several thousand records per second, even
* though there is a limit of 1000 Kinesis records per shard per second.
* If a shard gets throttled, the KPL will continue to retry records until
* either they succeed or reach a TTL set in the KPL's configuration, at
* which point the KPL will return failures for those records.
* @see {@link KinesisProducerConfiguration#setRecordTtl(long)}
private static final int RECORDS_PER_SECOND = 1;
* Change this to your stream name.
public static final String STREAM_NAME = "MyKinesis";
* Change this to the region you are using.
public static final String REGION = "us-west-2";
* Here'll walk through some of the config options and create an instance of
* KinesisProducer, which will be used to put records.
* @return KinesisProducer instance used to put records.
public static KinesisProducer getKinesisProducer() {
// There are many configurable parameters in the KPL. See the javadocs
// on each each set method for details.
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
// You can also load config from file. A sample properties file is
// included in the project folder.
// KinesisProducerConfiguration config =
// KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
// If you're running in EC2 and want to use the same Kinesis region as
// the one your instance is in, you can simply leave out the region
// configuration; the KPL will retrieve it from EC2 metadata.
// You can pass credentials programmatically through the configuration,
// similar to the AWS SDK. DefaultAWSCredentialsProviderChain is used
// by default, so this configuration can be omitted if that is all
// that is needed.
config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
// The maxConnections parameter can be used to control the degree of
// parallelism when making HTTP requests. We're going to use only 1 here
// since our throughput is fairly low. Using a high number will cause a
// bunch of broken pipe errors to show up in the logs. This is due to
// idle connections being closed by the server. Setting this value too
// large may also cause request timeouts if you do not have enough
// bandwidth.
// Set a more generous timeout in case we're on a slow connection.
// RecordMaxBufferedTime controls how long records are allowed to wait
// in the KPL's buffers before being sent. Larger values increase
// aggregation and reduces the number of Kinesis records put, which can
// be helpful if you're getting throttled because of the records per
// second limit on a shard. The default value is set very low to
// minimize propagation delay, so we'll increase it here to get more
// aggregation.
// If you have built the native binary yourself, you can point the Java
// wrapper to it with the NativeExecutable option. If you want to pass
// environment variables to the executable, you can either use a wrapper
// shell script, or set them for the Java process, which will then pass
// them on to the child process.
// config.setNativeExecutable("my_directory/kinesis_producer");
// If you end up using the default configuration (a Configuration instance
// without any calls to set*), you can just leave the config argument
// out.
// Note that if you do pass a Configuration instance, mutating that
// instance after initializing KinesisProducer has no effect. We do not
// support dynamic re-configuration at the moment.
KinesisProducer producer = new KinesisProducer(config);
return producer;
public static void main(String[] args) throws Exception {
final KinesisProducer producer = getKinesisProducer();
// The monotonically increasing sequence number we will put in the data of each record
final AtomicLong sequenceNumber = new AtomicLong(0);
// The number of records that have finished (either successfully put, or failed)
final AtomicLong completed = new AtomicLong(0);
// KinesisProducer.addUserRecord is asynchronous. A callback can be used to receive the results.
final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
public void onFailure(Throwable t) {
// We don't expect any failures during this sample. If it
// happens, we will log the first one and exit.
if (t instanceof UserRecordFailedException) {
Attempt last = Iterables.getLast(
((UserRecordFailedException) t).getResult().getAttempts());
"Record failed to put - %s : %s",
last.getErrorCode(), last.getErrorMessage()));
log.error("Exception during put", t);
public void onSuccess(UserRecordResult result) {
// The lines within run() are the essence of the KPL API.
final Runnable putOneRecord = new Runnable() {
public void run() {
ByteBuffer data = Utils.generateData(sequenceNumber.get(), DATA_SIZE);
// TIMESTAMP is our partition key
ListenableFuture<UserRecordResult> f =
producer.addUserRecord(STREAM_NAME, TIMESTAMP, Utils.randomExplicitHashKey(), data);
Futures.addCallback(f, callback);
// This gives us progress updates
EXECUTOR.scheduleAtFixedRate(new Runnable() {
public void run() {
long put = sequenceNumber.get();
double putPercent = 100.0 * put / total;
long done = completed.get();
double donePercent = 100.0 * done / total;
"Put %d of %d so far (%.2f %%), %d have completed (%.2f %%)",
put, total, putPercent, done, donePercent));
}, 1, 1, TimeUnit.SECONDS);
// Kick off the puts
"Starting puts... will run for %d seconds at %d records per second",
executeAtTargetRate(EXECUTOR, putOneRecord, sequenceNumber, SECONDS_TO_RUN, RECORDS_PER_SECOND);
// Wait for puts to finish. After this statement returns, we have
// finished all calls to putRecord, but the records may still be
// in-flight. We will additionally wait for all records to actually
// finish later.
EXECUTOR.awaitTermination(SECONDS_TO_RUN + 1, TimeUnit.SECONDS);
// If you need to shutdown your application, call flushSync() first to
// send any buffered records. This method will block until all records
// have finished (either success or fail). There are also asynchronous
// flush methods available.
// Records are also automatically flushed by the KPL after a while based
// on the time limit set with Configuration.setRecordMaxBufferedTime()
log.info("Waiting for remaining puts to finish...");
log.info("All records complete.");
// This kills the child process and shuts down the threads managing it.
* Executes a function N times per second for M seconds with a
* ScheduledExecutorService. The executor is shutdown at the end. This is
* more precise than simply using scheduleAtFixedRate.
* @param exec
* Executor
* @param task
* Task to perform
* @param counter
* Counter used to track how many times the task has been
* executed
* @param durationSeconds
* How many seconds to run for
* @param ratePerSecond
* How many times to execute task per second
private static void executeAtTargetRate(
final ScheduledExecutorService exec,
final Runnable task,
final AtomicLong counter,
final int durationSeconds,
final int ratePerSecond) {
exec.scheduleWithFixedDelay(new Runnable() {
final long startTime = System.nanoTime();
public void run() {
double secondsRun = (System.nanoTime() - startTime) / 1e9;
double targetCount = Math.min(durationSeconds, secondsRun) * ratePerSecond;
while (counter.get() < targetCount) {
try {
} catch (Exception e) {
log.error("Error running task", e);
if (secondsRun >= durationSeconds) {
}, 0, 1, TimeUnit.MILLISECONDS);
package com.kinesisdataconsumer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.kinesisdataproducer.SampleProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
* If you haven't looked at {@link SampleProducer}, do so first.
* <p>
* As mentioned in SampleProducer, we will check that all records are received
* correctly by the KCL by verifying that there are no gaps in the sequence
* numbers.
* <p>
* As the consumer runs, it will periodically log a message indicating the
* number of gaps it found in the sequence numbers. A gap is when the difference
* between two consecutive elements in the sorted list of seen sequence numbers
* is greater than 1.
* <p>
* Over time the number of gaps should converge to 0. You should also observe
* that the range of sequence numbers seen is equal to the number of records put
* by the SampleProducer.
* <p>
* If the stream contains data from multiple runs of SampleProducer, you should
* observe the SampleConsumer detecting this and resetting state to only count
* the latest run.
* <p>
* Note if you kill the SampleConsumer halfway and run it again, the number of
* gaps may never converge to 0. This is because checkpoints may have been made
* such that some records from the producer's latest run are not processed
* again. If you observe this, simply run the producer to completion again
* without terminating the consumer.
* <p>
* The consumer continues running until manually terminated, even if there are
* no more records to consume.
* @see SampleProducer
* @author chaodeng
public class SampleConsumer implements IRecordProcessorFactory {
private static final Logger log = LoggerFactory.getLogger(SampleConsumer.class);
// All records from a run of the producer have the same timestamp in their
// partition keys. Since this value increases for each run, we can use it
// determine which run is the latest and disregard data from earlier runs.
private final AtomicLong largestTimestamp = new AtomicLong(0);
// List of record sequence numbers we have seen so far.
private final List<Long> sequenceNumbers = new ArrayList<>();
// A mutex for largestTimestamp and sequenceNumbers. largestTimestamp is
// nevertheless an AtomicLong because we cannot capture non-final variables
// in the child class.
private final Object lock = new Object();
* One instance of RecordProcessor is created for every shard in the stream.
* All instances of RecordProcessor share state by capturing variables from
* the enclosing SampleConsumer instance. This is a simple way to combine
* the data from multiple shards.
private class RecordProcessor implements IRecordProcessor {
public void initialize(String shardId) {}
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
long timestamp = 0;
List<Long> seqNos = new ArrayList<>();
for (Record r : records) {//todo : delete the next line
System.out.println("debug check:" + StandardCharsets.UTF_8.decode(r.getData()).toString());
// Get the timestamp of this run from the partition key.
timestamp = Math.max(timestamp, Long.parseLong(r.getPartitionKey()));
// Extract the sequence number. It's encoded as a decimal
// string and placed at the beginning of the record data,
// followed by a space. The rest of the record data is padding
// that we will simply discard.
try {
byte[] b = new byte[r.getData().remaining()];
seqNos.add(Long.parseLong(new String(b, "UTF-8").split(" ")[0]));
} catch (Exception e) {
log.error("Error parsing record", e);
synchronized (lock) {
if (largestTimestamp.get() < timestamp) {
"Found new larger timestamp: %d (was %d), clearing state",
timestamp, largestTimestamp.get()));
// Only add to the shared list if our data is from the latest run.
if (largestTimestamp.get() == timestamp) {
try {
} catch (Exception e) {
log.error("Error while trying to checkpoint during ProcessRecords", e);
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
log.info("Shutting down, reason: " + reason);
try {
} catch (Exception e) {
log.error("Error while trying to checkpoint during Shutdown", e);
* Log a message indicating the current state.
public void logResults() {
synchronized (lock) {
if (largestTimestamp.get() == 0) {
if (sequenceNumbers.size() == 0) {
log.info("No sequence numbers found for current run.");
// The producer assigns sequence numbers starting from 1, so we
// start counting from one before that, i.e. 0.
long last = 0;
long gaps = 0;
for (long sn : sequenceNumbers) {
if (sn - last > 1) {
last = sn;
"Found %d gaps in the sequence numbers. Lowest seen so far is %d, highest is %d",
gaps, sequenceNumbers.get(0), sequenceNumbers.get(sequenceNumbers.size() - 1)));
public IRecordProcessor createProcessor() {
return this.new RecordProcessor();
public static void main(String[] args) {
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(
new DefaultAWSCredentialsProviderChain(),
final SampleConsumer consumer = new SampleConsumer();
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
public void run() {
}, 10, 1, TimeUnit.SECONDS);
new Worker.Builder()
package com.utils;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Random;
public class Utils {
private static final Random RANDOM = new Random();
* @return A random unsigned 128-bit int converted to a decimal string.
public static String randomExplicitHashKey() {
return new BigInteger(128, RANDOM).toString(10);
* Generates a blob containing a UTF-8 string. The string begins with the
* sequence number in decimal notation, followed by a space, followed by
* padding.
* @param sequenceNumber
* The sequence number to place at the beginning of the record
* data.
* @param totalLen
* Total length of the data. After the sequence number, padding
* is added until this length is reached.
* @return ByteBuffer containing the blob
public static ByteBuffer generateData(long sequenceNumber, int totalLen) {
StringBuilder sb = new StringBuilder();
sb.append(" ");
while (sb.length() < totalLen) {
try {
return ByteBuffer.wrap(sb.toString().getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);