Как расшифровать записи Kinesis с помощью ключа KMS? - PullRequest
0 голосов
/ 15 февраля 2019

Я пишу Kinesis Consumer, который читает и расшифровывает приведенные ниже сообщения. Это мой простой код для расшифровки байтов записей Kinesis.

public static String getString(ByteBuffer b) {
        byte[] byteArray = new byte[b.remaining()];
        b.get(byteArray);
        return new String(byteArray, charset);
    }
    public static String decryptByteStream(byte[] encryptedKeyBytes) throws CharacterCodingException {

        AWSKMS awskmsClient = AWSKMSClientBuilder.standard().withRegion(Regions.AP_SOUTHEAST_2).build();
        ByteBuffer blob = ByteBuffer.wrap(encryptedKeyBytes);

        DecryptResult decryptResult = awskmsClient
                .decrypt(new DecryptRequest().withCiphertextBlob(blob).withEncryptionContext(null));

        System.out.println(getString(decryptResult.getPlaintext()));
        return getString(decryptResult.getPlaintext());
    }

Здесь вызывается реализация Kinesis Stream.* Я получаю ниже исключения.

com.amazonaws.services.kms.model.InvalidCiphertextException: null (Service: AWSKMS; Status Code: 400; Error Code: InvalidCiphertextException; Request ID: 51cf54df-1c05-47bd-a164-d96ab7736dd4)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1586)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1254)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:747)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:2607)
    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:2583)
    at com.amazonaws.services.kms.AWSKMSClient.decrypt(AWSKMSClient.java:750)
    at com.telstra.stream.KinesisStreamReader.decryptByteStream(KinesisStreamReader.java:71)
    at com.telstra.stream.KinesisStreamReader$1$1.call(KinesisStreamReader.java:143)
    at com.telstra.stream.KinesisStreamReader$1$1.call(KinesisStreamReader.java:1)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
19/02/15 20:38:10 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): com.amazonaws.services.kms.model.InvalidCiphertextException: null (Service: AWSKMS; Status Code: 400; Error Code: InvalidCiphertextException; Request ID: 51cf54df-1c05-47bd-a164-d96ab7736dd4)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1586)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1254)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:747)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:2607)
    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:2583)
    at com.amazonaws.services.kms.AWSKMSClient.decrypt(AWSKMSClient.java:750)
    at com.telstra.stream.KinesisStreamReader.decryptByteStream(KinesisStreamReader.java:71)
    at com.telstra.stream.KinesisStreamReader$1$1.call(KinesisStreamReader.java:143)
    at com.telstra.stream.KinesisStreamReader$1$1.call(KinesisStreamReader.java:1)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

19/02/15 20:38:10 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): com.amazonaws.services.kms.model.InvalidCiphertextException: null (Service: AWSKMS; Status Code: 400; Error Code: InvalidCiphertextException; Request ID: 51cf54df-1c05-47bd-a164-d96ab7736dd4)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1586)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1254)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:747)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:2607)
    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:2583)
    at com.amazonaws.services.kms.AWSKMSClient.decrypt(AWSKMSClient.java:750)
    at com.telstra.stream.KinesisStreamReader.decryptByteStream(KinesisStreamReader.java:71)
    at com.telstra.stream.KinesisStreamReader$1$1.call(KinesisStreamReader.java:143)
    at com.telstra.stream.KinesisStreamReader$1$1.call(KinesisStreamReader.java:1)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1358)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1331)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1466)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1466)
    at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1466)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1465)
    at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
    at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
    at com.telstra.stream.KinesisStreamReader$1.call(KinesisStreamReader.java:147)
    at com.telstra.stream.KinesisStreamReader$1.call(KinesisStreamReader.java:1)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: com.amazonaws.services.kms.model.InvalidCiphertextException: null (Service: AWSKMS; Status Code: 400; Error Code: InvalidCiphertextException; Request ID: 51cf54df-1c05-47bd-a164-d96ab7736dd4)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1586)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1254)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:747)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:721)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:672)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:654)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:518)
    at com.amazonaws.services.kms.AWSKMSClient.doInvoke(AWSKMSClient.java:2607)
    at com.amazonaws.services.kms.AWSKMSClient.invoke(AWSKMSClient.java:2583)
    at com.amazonaws.services.kms.AWSKMSClient.decrypt(AWSKMSClient.java:750)
    at com.telstra.stream.KinesisStreamReader.decryptByteStream(KinesisStreamReader.java:71)
    at com.telstra.stream.KinesisStreamReader$1$1.call(KinesisStreamReader.java:143)
    at com.telstra.stream.KinesisStreamReader$1$1.call(KinesisStreamReader.java:1)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
    at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    ... 3 more
...