Невозможно записать в поток Kinesis - PullRequest
0 голосов
/ 14 мая 2019

Я пытаюсь использовать библиотеку AWS KPL для записи данных в мой поток доставки Data Firehouse (НЕ поток Amazon Kinesis, насколько я понимаю, они разные).Однако я, похоже, совершенно не могу заставить его работать ни на экземпляре Lambda, ни на моей локальной машине.Я пробовал несколько разных версий Amazon Kinesis, но сейчас использую 0.12.11.

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-producer</artifactId>
        <version>0.12.11</version>
    </dependency>

Я следовал инструкциям, описанным здесь: https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-writing.html мой код указан ниже.Этот код выполняется без ошибок, кроме предупреждения о неконфигурировании Log4J.Когда он закончится, в моем ведре s3 нет логов.

AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new
                 BasicAWSCredentials("accessKey",
                 "secretKey"));


         KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                 .setRecordMaxBufferedTime(300)
                 .setMaxConnections(5)
                 .setRequestTimeout(6000)
                 .setCredentialsProvider(credentialsProvider)
                 .setVerifyCertificate(false)
                 .setRegion("eu-west-1");


        final KinesisProducer kinesis = new KinesisProducer(config);
        ByteBuffer data = ByteBuffer.wrap(("Test main java").getBytes());
        kinesis.addUserRecord("myStream", "myKey", data);

Затем я также попытался дождаться обратных вызовов, и это приводит к ошибкам, как показано ниже.

AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new
                 BasicAWSCredentials("accessKey",
                 "secretKey"));


         KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                 .setRecordMaxBufferedTime(300)
                 .setMaxConnections(5)
                 .setRequestTimeout(6000)
                 .setCredentialsProvider(credentialsProvider)
                 .setVerifyCertificate(false)
                 .setRegion("eu-west-1");


         KinesisProducer kinesis = new KinesisProducer(config);

         Thread.sleep(2000); //Was told it could help as the producers takes time to set up.
         FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
             @Override
             public void onFailure(Throwable t) {
                 System.out.println("Failed: " + t.toString());
                 System.out.println(t.getStackTrace().toString()); //This always prints exceptions as below
                 t.printStackTrace();
             }

             @Override
             public void onSuccess(UserRecordResult result) {
                 System.out.println("Success: " + result.toString());
             }
         };

         for (int i = 0; i < 10; ++i) {
             ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
             ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myKey", data);
             // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
             Futures.addCallback(f, myCallback);
         }

         for (int i = 0; i < 5; i++) {
             try {
                 Thread.sleep(10000); //So I can wait and see the callbacks.
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }

         }

Failed: com.amazonaws.services.kinesis.producer.UserRecordFailedException
[Ljava.lang.StackTraceElement;@3b138ba8
com.amazonaws.services.kinesis.producer.UserRecordFailedException
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.onPutRecordResult(KinesisProducer.java:197)
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.access$000(KinesisProducer.java:131)
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler$1.run(KinesisProducer.java:138)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Я часами занимаюсь этим, что я делаю не так?Многие другие, похоже, имеют ту же проблему: https://github.com/awslabs/amazon-kinesis-producer/issues/39

...