Я пытаюсь использовать библиотеку AWS KPL для записи данных в мой поток доставки Data Firehouse (НЕ поток Amazon Kinesis, насколько я понимаю, они разные).Однако я, похоже, совершенно не могу заставить его работать ни на экземпляре Lambda, ни на моей локальной машине.Я пробовал несколько разных версий Amazon Kinesis, но сейчас использую 0.12.11.
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.12.11</version>
</dependency>
Я следовал инструкциям, описанным здесь: https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-writing.html мой код указан ниже.Этот код выполняется без ошибок, кроме предупреждения о неконфигурировании Log4J.Когда он закончится, в моем ведре s3 нет логов.
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new
BasicAWSCredentials("accessKey",
"secretKey"));
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
.setRecordMaxBufferedTime(300)
.setMaxConnections(5)
.setRequestTimeout(6000)
.setCredentialsProvider(credentialsProvider)
.setVerifyCertificate(false)
.setRegion("eu-west-1");
final KinesisProducer kinesis = new KinesisProducer(config);
ByteBuffer data = ByteBuffer.wrap(("Test main java").getBytes());
kinesis.addUserRecord("myStream", "myKey", data);
Затем я также попытался дождаться обратных вызовов, и это приводит к ошибкам, как показано ниже.
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new
BasicAWSCredentials("accessKey",
"secretKey"));
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
.setRecordMaxBufferedTime(300)
.setMaxConnections(5)
.setRequestTimeout(6000)
.setCredentialsProvider(credentialsProvider)
.setVerifyCertificate(false)
.setRegion("eu-west-1");
KinesisProducer kinesis = new KinesisProducer(config);
Thread.sleep(2000); //Was told it could help as the producers takes time to set up.
FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
@Override
public void onFailure(Throwable t) {
System.out.println("Failed: " + t.toString());
System.out.println(t.getStackTrace().toString()); //This always prints exceptions as below
t.printStackTrace();
}
@Override
public void onSuccess(UserRecordResult result) {
System.out.println("Success: " + result.toString());
}
};
for (int i = 0; i < 10; ++i) {
ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myKey", data);
// If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
Futures.addCallback(f, myCallback);
}
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10000); //So I can wait and see the callbacks.
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Failed: com.amazonaws.services.kinesis.producer.UserRecordFailedException
[Ljava.lang.StackTraceElement;@3b138ba8
com.amazonaws.services.kinesis.producer.UserRecordFailedException
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.onPutRecordResult(KinesisProducer.java:197)
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.access$000(KinesisProducer.java:131)
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler$1.run(KinesisProducer.java:138)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Я часами занимаюсь этим, что я делаю не так?Многие другие, похоже, имеют ту же проблему: https://github.com/awslabs/amazon-kinesis-producer/issues/39