Я новичок в Amazon Kinesis.Я запускаю AWS Kinesis KPL, который я немного изменил, но по какой-то причине, которую я не понимаю, у меня постоянно появляются такие ошибки:
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@2b546384 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@13969fbe[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 86]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:573)
at main.com.Producer.produceData(Producer.java:182)
at main.com.Main.main(Main.java:20)
Я не знаю, связано ли это или нет, ноэто сработало, когда я запускал продюсера и потребителя из одного и того же проекта один за другим, но я действительно хочу, чтобы они были параллельными.
Main:
package main.com;
import javafx.animation.PauseTransition;
import javafx.util.Duration;
import java.util.ArrayList;
public class Main {
public static void main(String args[]) {
String transactionFileName = "src/main/resources/transactionFailureSystem.json";
//parse all the transactions from 'transactions.json'
TransactionsFileParser transactionsFileParser = new TransactionsFileParser(transactionFileName);
ArrayList<Transaction> transactions = transactionsFileParser.parseFile();
Producer producer = new Producer();
for(Transaction transaction:transactions){
try {
producer.produceData(transaction);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Производитель:
package main.com;
import main.com.Transaction;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
public class Producer {
private static final Logger log = LoggerFactory.getLogger(Producer.class);
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(1);
/**
* Timestamp we'll attach to every record
*/
private static final String TIMESTAMP = Long.toString(System.currentTimeMillis());
/**
* Change these to try larger or smaller records.
*/
private static final int DATA_SIZE = 2000;
/**
* Put records for this number of seconds before exiting.
*/
private static final int SECONDS_TO_RUN = 1;
/**
* Put this number of records per second.
*
* Because multiple logical records are combined into each Kinesis record,
* even a single shard can handle several thousand records per second, even
* though there is a limit of 1000 Kinesis records per shard per second.
*
* If a shard gets throttled, the KPL will continue to retry records until
* either they succeed or reach a TTL set in the KPL's configuration, at
* which point the KPL will return failures for those records.
*
* @see {@link KinesisProducerConfiguration#setRecordTtl(long)}
*/
private static final int RECORDS_PER_SECOND = 1;
/**
* Change this to your stream name.
*/
public static final String STREAM_NAME = "LimorKinesis";
/**
* Change this to the region you are using.
*/
public static final String REGION = "us-west-2";
public Producer(){}
/**
* Here'll walk through some of the config options and create an instance of
* KinesisProducer, which will be used to put records.
*
* @return KinesisProducer instance used to put records.
*/
public static KinesisProducer getKinesisProducer() {
// There are many configurable parameters in the KPL. See the javadocs
// on each each set method for details.
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
// You can also load config from file. A sample properties file is
// included in the project folder.
// KinesisProducerConfiguration config =
// KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
// If you're running in EC2 and want to use the same Kinesis region as
// the one your instance is in, you can simply leave out the region
// configuration; the KPL will retrieve it from EC2 metadata.
config.setRegion(REGION);
// You can pass credentials programmatically through the configuration,
// similar to the AWS SDK. DefaultAWSCredentialsProviderChain is used
// by default, so this configuration can be omitted if that is all
// that is needed.
config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
// The maxConnections parameter can be used to control the degree of
// parallelism when making HTTP requests. We're going to use only 1 here
// since our throughput is fairly low. Using a high number will cause a
// bunch of broken pipe errors to show up in the logs. This is due to
// idle connections being closed by the server. Setting this value too
// large may also cause request timeouts if you do not have enough
// bandwidth.
config.setMaxConnections(1);
// Set a more generous timeout in case we're on a slow connection.
config.setRequestTimeout(60000);
// RecordMaxBufferedTime controls how long records are allowed to wait
// in the KPL's buffers before being sent. Larger values increase
// aggregation and reduces the number of Kinesis records put, which can
// be helpful if you're getting throttled because of the records per
// second limit on a shard. The default value is set very low to
// minimize propagation delay, so we'll increase it here to get more
// aggregation.
config.setRecordMaxBufferedTime(15000);
// If you have built the native binary yourself, you can point the Java
// wrapper to it with the NativeExecutable option. If you want to pass
// environment variables to the executable, you can either use a wrapper
// shell script, or set them for the Java process, which will then pass
// them on to the child process.
// config.setNativeExecutable("my_directory/kinesis_producer");
// If you end up using the default configuration (a Configuration instance
// without any calls to set*), you can just leave the config argument
// out.
//
// Note that if you do pass a Configuration instance, mutating that
// instance after initializing KinesisProducer has no effect. We do not
// support dynamic re-configuration at the moment.
KinesisProducer producer = new KinesisProducer(config);
return producer;
}
public void produceData(Transaction transaction) throws Exception {
final KinesisProducer producer = getKinesisProducer();
// The monotonically increasing sequence number we will put in the data of each record
final AtomicLong sequenceNumber = new AtomicLong(0);
// The number of records that have finished (either successfully put, or failed)
final AtomicLong completed = new AtomicLong(0);
// KinesisProducer.addUserRecord is asynchronous. A callback can be used to receive the results.
final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
// We don't expect any failures during this sample. If it
// happens, we will log the first one and exit.
if (t instanceof UserRecordFailedException) {
Attempt last = Iterables.getLast(
((UserRecordFailedException) t).getResult().getAttempts());
log.error(String.format(
"Record failed to put - %s : %s",
last.getErrorCode(), last.getErrorMessage()));
}
log.error("Exception during put", t);
System.exit(1);
}
@Override
public void onSuccess(UserRecordResult result) {
completed.getAndIncrement();
}
};
// The lines within run() are the essence of the KPL API.
final Runnable putOneRecord = new Runnable() {
@Override
public void run() {
ByteBuffer data = Utils.generateData(sequenceNumber.get(), transaction);
// TIMESTAMP is our partition key
ListenableFuture<UserRecordResult> f =
producer.addUserRecord(STREAM_NAME, TIMESTAMP, Utils.randomExplicitHashKey(), data);
Futures.addCallback(f, callback);
}
};
// This gives us progress updates
EXECUTOR.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
long put = sequenceNumber.get();
long total = RECORDS_PER_SECOND * SECONDS_TO_RUN;
double putPercent = 100.0 * put / total;
long done = completed.get();
double donePercent = 100.0 * done / total;
log.info(String.format(
"Put %d of %d so far (%.2f %%), %d have completed (%.2f %%)",
put, total, putPercent, done, donePercent));
}
}, 1, 1, TimeUnit.SECONDS);
// Kick off the puts
log.info(String.format(
"Starting puts... will run for %d seconds at %d records per second",
SECONDS_TO_RUN, RECORDS_PER_SECOND));
executeAtTargetRate(EXECUTOR, putOneRecord, sequenceNumber, SECONDS_TO_RUN, RECORDS_PER_SECOND);
// Wait for puts to finish. After this statement returns, we have
// finished all calls to putRecord, but the records may still be
// in-flight. We will additionally wait for all records to actually
// finish later.
EXECUTOR.awaitTermination(SECONDS_TO_RUN + 1, TimeUnit.SECONDS);
// If you need to shutdown your application, call flushSync() first to
// send any buffered records. This method will block until all records
// have finished (either success or fail). There are also asynchronous
// flush methods available.
//
// Records are also automatically flushed by the KPL after a while based
// on the time limit set with Configuration.setRecordMaxBufferedTime()
log.info("Waiting for remaining puts to finish...");
producer.flushSync();
log.info("All records complete.");
// This kills the child process and shuts down the threads managing it.
producer.destroy();
log.info("Finished.");
}
/**
* Executes a function N times per second for M seconds with a
* ScheduledExecutorService. The executor is shutdown at the end. This is
* more precise than simply using scheduleAtFixedRate.
*
* @param exec
* Executor
* @param task
* Task to perform
* @param counter
* Counter used to track how many times the task has been
* executed
* @param durationSeconds
* How many seconds to run for
* @param ratePerSecond
* How many times to execute task per second
*/
private static void executeAtTargetRate(
final ScheduledExecutorService exec,
final Runnable task,
final AtomicLong counter,
final int durationSeconds,
final int ratePerSecond) {
exec.scheduleWithFixedDelay(new Runnable() {
final long startTime = System.nanoTime();
@Override
public void run() {
double secondsRun = (System.nanoTime() - startTime) / 1e9;
double targetCount = Math.min(durationSeconds, secondsRun) * ratePerSecond;
while (counter.get() < targetCount) {
counter.getAndIncrement();
try {
task.run();
} catch (Exception e) {
log.error("Error running task", e);
System.exit(1);
}
}
if (secondsRun >= durationSeconds) {
exec.shutdown();
}
}
}, 0, 1, TimeUnit.MILLISECONDS);
}
}
Транзакция:
package main.com;
import org.json.JSONObject;
import java.util.UUID;
public class Transaction {
private final UUID uuid;
private final String sender;
private final String timeReceived;
private final String receiver;
private final String timeSent;
private final String description;
private final String sequenceNumber;
/**
* This class represents a transaction between two micro-services in our system.
* @param uuid A unique identifier that can be traced all the way through from the initial request and
* through all subsequent processing. (Also known as correlation identifier).
* @param sender The micro-service that sent the transaction.
* @param timeReceived The time in which the recipient micro-service have received the transaction.
* @param receiver The micro-service that received the transaction.
* @param timeSent The time in which the sender micro-service have sent the transaction.
* @param description A detailed description of the transaction.
*/
public Transaction (UUID uuid, String sender, String timeReceived, String receiver, String timeSent,
String description, String sequenceNumber ){
this.uuid = uuid;
this.sender = sender;
this.timeReceived = timeReceived;
this.receiver = receiver;
this.timeSent = timeSent;
this.description = description;
this.sequenceNumber = sequenceNumber;
}
public UUID getUuid() {
return uuid;
}
public String getSender() {
return sender;
}
public String getTimeReceived() {
return timeReceived;
}
public String getReceiver() {
return receiver;
}
public String getTimeSent() {
return timeSent;
}
public String getSequenceNumber(){
return this.sequenceNumber;
}
public String getDescription(){return this.description;}
public String toString(){
return "{\""+ "uuid\":" + "\""+ this.uuid.toString() + "\""+ "," + "\n" +
"\"sender\":" + "\""+this.sender + "\""+"," + "\n" +
"\"timeReceived\":" + "\""+ this.timeReceived + "\""+ "," + "\n" +
"\"receiver\":" + "\""+ this.receiver + "\""+ "," + "\n" +
"\"timeSent\":" + "\""+ this.timeSent + "\""+ "," + "\n" +
"\"description\":" + "\""+ this.description + "\""+ "," + "\n" +
"\"sequenceNumber\":" + "\""+ this.sequenceNumber + "\"}";
}
public JSONObject convertTransactionToJson(){
JSONObject json = new JSONObject();
json.put("uuid", this.uuid.toString());
json.put("sender", this.sender);
json.put("timeReceived", this.timeReceived);
json.put("receiver", this.receiver);
json.put("timeSent", this.timeSent);
json.put("description",this.description);
json.put("sequenceNumber",this.sequenceNumber);
return json;
}
public static Transaction convertJsonToTransaction(JSONObject json){
UUID uuid= UUID.fromString(json.getString("uuid"));
String sender = json.getString("sender");
String timeReceived = json.getString("timeReceived");
String receiver = json.getString("receiver");
String timeSent = json.getString("timeSent");
String description=json.getString("description");
String sequenceNumber=json.getString("sequenceNumber");
return new Transaction(uuid, sender, timeReceived, receiver, timeSent, description,sequenceNumber);
}
}
Утилиты:
package main.com;
import main.com.Transaction;
import java.io.UnsupportedEncodingException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Random;
public class Utils {
private static final Random RANDOM = new Random();
/**
* @return A random unsigned 128-bit int converted to a decimal string.
*/
public static String randomExplicitHashKey() {
return new BigInteger(128, RANDOM).toString(10);
}
/**
* Generates a blob containing a UTF-8 string. The string begins with the
* sequence number in decimal notation, followed by a space, followed by
* padding.
*
* @param sequenceNumber
* The sequence number to place at the beginning of the record
* data.
* @param transaction
* Total length of the data. After the sequence number, padding
* is added until this length is reached.
* @return ByteBuffer containing the blob
*/
public static ByteBuffer generateData(long sequenceNumber, Transaction transaction) {
StringBuilder sb = new StringBuilder();
sb.append(Long.toString(sequenceNumber));
sb.append("#");
sb.append(transaction.toString());
try {
return ByteBuffer.wrap(sb.toString().getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
}
TransactionFileParser:
package main.com;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.UUID;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
public class TransactionsFileParser {
private String fileName;
public TransactionsFileParser(String fileName){
this.fileName = fileName;
}
public ArrayList<Transaction> parseFile() {
JSONParser parser = new JSONParser();
ArrayList<Transaction> allDataInFile = new ArrayList<>();
try {
JSONArray transactions = (JSONArray) parser.parse(new FileReader(fileName));
for (Object o : transactions) {
JSONObject flow = (JSONObject) o;
UUID uuid = UUID.fromString((String)flow.get("uuid"));
String sender =(String) flow.get("sender");
String timeReceived = (String) flow.get("timeReceived");
String receiver =(String) flow.get("receiver");
String timeSent= (String) flow.get("timeSent");
String description =(String) flow.get("description");
String sequenceNumber = (String)flow.get("sequenceNumber");
allDataInFile.add(new Transaction(uuid, sender, timeReceived, receiver, timeSent, description,sequenceNumber));
}
return allDataInFile;
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
return null;
}
}