Ошибка при чтении из потока кинезиса с помощью искры. Ошибка с ошибкой "Не удалось получить статус таблицы для Kinesis-Streaming" - PullRequest
0 голосов
/ 02 августа 2020

Я пытаюсь читать из потока кинезиса в программе потоковой передачи искр, как указано ниже

from __future__ import print_function
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
import sys,os
from pyspark import SparkContext
from pyspark.streaming import StreamingContext



def f(iterator):
    print ('inside print',iterator)
    for x in iterator:
        print('rdd value is',x)


def printRecord(rdd):
    print("========================================================")
    print("Starting new RDD")
    print("========================================================")
    rdd.collect()
    rdd.foreach(f)

if __name__ == "__main__":
    reload(sys)  
    sys.setdefaultencoding('utf-8')
    sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
    ssc = StreamingContext(sc, 10)
    AWS_ACCESS_KEY=os.environ['aws_key']
    AWS_SECRET_KEY=os.environ['aws_secret_key']
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)
    sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)
    endpointUrl='https://kinesis.us-east-1.amazonaws.com'
    streamName='test-kinesis'
    appName='Kinesis-Streaming'
    regionName='us-east-1'
    #appName, streamName, endpointUrl, regionName = sys.argv[1:]
    regionName='us-east-1'
    lines = KinesisUtils.createStream(
        ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.TRIM_HORIZON, 10)
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

Я вижу, что динамодб для отслеживания кинезиса не создается. Задание завершается с ошибкой ниже

20/08/01 15:15:49 ERROR LeaseManager: Failed to get table status for Kinesis-Streaming
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The security token included in the request is expired (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ExpiredTokenException; Request ID: 1VPKRUJIRSHASS1NL0OAOK62DFVV4KQNSO5AEMVJF66Q9ASUAAJG)
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.tableStatus(LeaseManager.java:163)
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:108)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:329)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:674)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:614)
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$2.run(KinesisReceiver.scala:196)
Caused by: com.amazonaws.services.dynamodbv2.model.AmazonDynamoDBException: The security token included in the request is expired (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ExpiredTokenException; Request ID: 1VPKRUJIRSHASS1NL0OAOK62DFVV4KQNSO5AEMVJF66Q9ASUAAJG)

Пожалуйста, дайте мне знать, что здесь может быть не так.

...