Искровые доступы Кинесис поток до JSON объектов - PullRequest
0 голосов
/ 09 апреля 2020

Я только начинаю работу над Spark, и сейчас я работаю над приложением для потоковой передачи данных из Kinesis. У меня есть продюсер, который помещает JSON объекты в один ниже

{
   "username":"crystalhamilton",
   "name":"Mark Cooper",
   "gender":"M",
   "address":"0305 Landry Circle Apt. 698\\nChadberg, DC 56711",
   "mail":"mcdanielandrew@yahoo.com",
   "birthdate":"1973-06-02"
}

, а затем я пытаюсь заставить моего потребителя Spark получить его, чтобы позже я мог его проанализировать. Я установил интервал контрольной точки на 2 секунды, следуя документации Spark , и я знаю, что каждая партия будет DStream, которая, скорее всего, будет включать в себя несколько RDD, и я предполагаю, что каждый RDD может быть проанализирован в отдельном JSON объект.

К сожалению, я не знаю, как прочитать поток и разбить DStream на RDD, а затем проанализировать RDD в документе JSON. Я попытался отобразить Dstream с помощью лямбды, используя json .dumps [x], и это, похоже, не работает.


def load_json(time, x):
    try:
        return json.loads(x[1])
    except Exception:
        log.error('Mesage in DStream is not a JSON object')
        return {}

def main(sc): 
    try:
        streaming_context = StreamingContext(sc, 10)

        kinesis_stream = KinesisUtils.createStream(streaming_context,
                                            kinesisAppName=application_config['application']['name'],
                                            streamName=application_config['aws']['stream_name'],
                                            endpointUrl=application_config['aws']['endpoint_url'],
                                            regionName=application_config['aws']['region_name'],
                                            initialPositionInStream=InitialPositionInStream.LATEST,
                                            checkpointInterval=1
                                            )


        py_dict_rdd = kinesis_stream.map(lambda x:load_json)

        py_dict_rdd.pprint(lambda x: print('username: [%s]', x['username']))


        streaming_context.start()
        streaming_context.awaitTermination()
        streaming_context.stop()
   except Exception as e:
        log.error("Exception occurred in the main() function - error: [%s]", e)


Когда я запускаю это, я получаю исключение, которое говорит:

[Stage 2:>                                                          (0 + 1) / 1]20/04/09 14:52:20 ERROR JobScheduler: Error running job streaming job 1586458340000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/streaming/dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
TypeError: unsupported operand type(s) for +: 'function' and 'int'

    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
    at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
    at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
ERROR:app.__main__:Exception occurred in the main() function - error: [An error occurred while calling o36.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/streaming/dstream.py", line 173, in takeAndPrint
    taken = rdd.take(num + 1)
TypeError: unsupported operand type(s) for +: 'function' and 'int'

    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
    at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
    at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
    at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
]

Process finished with exit code 0

Все примеры, которые я нашел, имеют дело с количеством слов и не могут найти ничего похожего на то, что я пытаюсь делать. Заранее благодарим за любую помощь или информацию, которая может указывать мне в правильном направлении.

С уважением,

- MD.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...