Я только начинаю работу над Spark, и сейчас я работаю над приложением для потоковой передачи данных из Kinesis. У меня есть продюсер, который помещает JSON объекты в один ниже
{
"username":"crystalhamilton",
"name":"Mark Cooper",
"gender":"M",
"address":"0305 Landry Circle Apt. 698\\nChadberg, DC 56711",
"mail":"mcdanielandrew@yahoo.com",
"birthdate":"1973-06-02"
}
, а затем я пытаюсь заставить моего потребителя Spark получить его, чтобы позже я мог его проанализировать. Я установил интервал контрольной точки на 2 секунды, следуя документации Spark , и я знаю, что каждая партия будет DStream, которая, скорее всего, будет включать в себя несколько RDD, и я предполагаю, что каждый RDD может быть проанализирован в отдельном JSON объект.
К сожалению, я не знаю, как прочитать поток и разбить DStream на RDD, а затем проанализировать RDD в документе JSON. Я попытался отобразить Dstream с помощью лямбды, используя json .dumps [x], и это, похоже, не работает.
def load_json(time, x):
try:
return json.loads(x[1])
except Exception:
log.error('Mesage in DStream is not a JSON object')
return {}
def main(sc):
try:
streaming_context = StreamingContext(sc, 10)
kinesis_stream = KinesisUtils.createStream(streaming_context,
kinesisAppName=application_config['application']['name'],
streamName=application_config['aws']['stream_name'],
endpointUrl=application_config['aws']['endpoint_url'],
regionName=application_config['aws']['region_name'],
initialPositionInStream=InitialPositionInStream.LATEST,
checkpointInterval=1
)
py_dict_rdd = kinesis_stream.map(lambda x:load_json)
py_dict_rdd.pprint(lambda x: print('username: [%s]', x['username']))
streaming_context.start()
streaming_context.awaitTermination()
streaming_context.stop()
except Exception as e:
log.error("Exception occurred in the main() function - error: [%s]", e)
Когда я запускаю это, я получаю исключение, которое говорит:
[Stage 2:> (0 + 1) / 1]20/04/09 14:52:20 ERROR JobScheduler: Error running job streaming job 1586458340000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/streaming/dstream.py", line 173, in takeAndPrint
taken = rdd.take(num + 1)
TypeError: unsupported operand type(s) for +: 'function' and 'int'
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
ERROR:app.__main__:Exception occurred in the main() function - error: [An error occurred while calling o36.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/streaming/dstream.py", line 173, in takeAndPrint
taken = rdd.take(num + 1)
TypeError: unsupported operand type(s) for +: 'function' and 'int'
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
]
Process finished with exit code 0
Все примеры, которые я нашел, имеют дело с количеством слов и не могут найти ничего похожего на то, что я пытаюсь делать. Заранее благодарим за любую помощь или информацию, которая может указывать мне в правильном направлении.
С уважением,
- MD.