SPARK не может использовать поток AWS Kinesis - PullRequest
0 голосов
/ 15 февраля 2019
Environment : EMR
AWS Kinesis Steam
Language : PySpark

У меня есть входящий поток AWS Kinesis, и я могу использовать поток с помощью Python (поэтому EMR может извлекать потоки).Когда я пытался использовать PySpark Streaming , я не смог получить поток, вместо этого печатались только журналы.Я не делаю никаких преобразований, просто попытался прочитать поток и распечатать.Может ли кто-нибудь наставить меня в этом.

from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
appName = 'kinesis_myreal_time_stream'
streamName = 'kinesis_myreal_time_stream'
endpointUrl = 'apigateway.us-east-1.amazonaws.com'
regionName = 'us-east-1'
sc = SparkContext()
ssc = StreamingContext(sc, 10)
lines = KinesisUtils.createStream(ssc = ssc, kinesisAppName = appName, streamName = streamName,
                                  endpointUrl = endpointUrl, regionName = regionName,
                                  initialPositionInStream = InitialPositionInStream.LATEST, checkpointInterval = 2)
# counts = lines.flatMap(lambda line: line.split("}{")) \
#     .map(lambda word: (word, 1)) \
#     .reduceByKey(lambda a, b: a+b)
# counts.pprint()
lines.pprint()
ssc.start()
ssc.awaitTermination()

получение журналов, как показано ниже

-------------------------------------------
Time: 2019-02-15 13:17:10
-------------------------------------------

19/02/15 13:17:10 INFO JobScheduler: Finished job streaming job 1550236630000 ms.0 from job set of time 1550236630000 ms
19/02/15 13:17:10 INFO PythonRDD: Removing RDD 59 from persistence list
19/02/15 13:17:10 INFO JobScheduler: Total delay: 0.014 s for time 1550236630000 ms (execution: 0.002 s)
19/02/15 13:17:10 INFO BlockManager: Removing RDD 59
19/02/15 13:17:10 INFO KinesisBackedBlockRDD: Removing RDD 58 from persistence list
19/02/15 13:17:10 INFO BlockManager: Removing RDD 58
19/02/15 13:17:10 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[58] at createStream at NativeMethodAccessorImpl.java:0 of time 1550236630000 ms
19/02/15 13:17:10 INFO ReceivedBlockTracker: Deleting batches: 1550236610000 ms
19/02/15 13:17:10 INFO InputInfoTracker: remove old batch metadata: 1550236610000 ms
19/02/15 13:17:20 INFO JobScheduler: Added jobs for time 1550236640000 ms
19/02/15 13:17:20 INFO JobScheduler: Starting job streaming job 1550236640000 ms.0 from job set of time 1550236640000 ms
-------------------------------------------
Time: 2019-02-15 13:17:20
-------------------------------------------

19/02/15 13:17:20 INFO JobScheduler: Finished job streaming job 1550236640000 ms.0 from job set of time 1550236640000 ms
19/02/15 13:17:20 INFO PythonRDD: Removing RDD 61 from persistence list
19/02/15 13:17:20 INFO JobScheduler: Total delay: 0.018 s for time 1550236640000 ms (execution: 0.001 s)
19/02/15 13:17:20 INFO BlockManager: Removing RDD 61
19/02/15 13:17:20 INFO KinesisBackedBlockRDD: Removing RDD 60 from persistence list
19/02/15 13:17:20 INFO BlockManager: Removing RDD 60
19/02/15 13:17:20 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[60] at createStream at NativeMethodAccessorImpl.java:0 of time 1550236640000 ms
19/02/15 13:17:20 INFO ReceivedBlockTracker: Deleting batches: 1550236620000 ms
19/02/15 13:17:20 INFO InputInfoTracker: remove old batch metadata: 1550236620000 ms
19/02/15 13:17:30 INFO JobScheduler: Added jobs for time 1550236650000 ms
19/02/15 13:17:30 INFO JobScheduler: Starting job streaming job 1550236650000 ms.0 from job set of time 1550236650000 ms
-------------------------------------------
Time: 2019-02-15 13:17:30
-------------------------------------------

1 Ответ

0 голосов
/ 20 февраля 2019

Я думаю, вы скопировали вставленный неверный URL-адрес конечной точки в ваше приложение.Также я не думаю, что вам нужно проходить это всегда.Вы передаете URL службы Apigateway.

Это должно быть похоже на этот пример

@param endpointUrl  Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)

https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala#L90

...