Kafka Stream to Spark Stream с использованием PySpark - PullRequest
0 голосов
/ 09 октября 2018

У нас есть поток Кафка, который использует Avro.Мне нужно подключить его к Spark Stream, используя python.

Я использовал приведенный ниже код, чтобы сделать это:

kvs = KafkaUtils.createDirectStream(ssc, topic, {'bootstrap.servers': brokers}, valueDecoder=decoder)

Затем я получил ошибку ниже.

Anпроизошла ошибка при вызове o44.awaitTermination.

2018-10-11 15:58:01 INFO DAGScheduler: 54 - сбой задания 3: runJob в PythonRDD.scala: 149, заняло 1,403049 с

2018-10-11 15:58:01 ИНФОРМАЦИЯ JobScheduler: 54 - Завершенное потоковое задание 1539253680000 мс.0 из заданного времени 1539253680000 мс

2018-10-11 15:58:01 ОШИБКА JobScheduler: 91- Ошибка при выполнении задания потоковой передачи задания 1539253680000 ms.0

org.apache.spark.SparkException: исключение было вызвано Python: трассировка (последний вызов был последним):

File "/ XXXXXX /spark2 / python / lib / pyspark.zip / pyspark / streaming / util.py ", строка 65, в вызове

r = self.func (t, * rdds)

File" /XXXXXX / spark2 / python / lib / pyspark.zip / pyspark / streaming / dstream.py ", строка 171, в takeAndPrint

take = rdd.take (num + 1)

File"/ XXXXXX / spark2 / python / lib / pyspark.zip / pyspark / rdd.py ", строка 1375, в дубле

res = self.context.runJob (self, takeUpToNumLeft, p)

Файл "/XXXXXX/spark2/python/lib/pyspark.zip/pyspark/context.py", строка 1013, в runJob

sock_info = self._jvm.PythonRDD.runJob (self._jsc.sc (), mappedRDD._jrdd, разделы)

Файл "/XXXXXX/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", строка 1257, в call

ответ, self.gateway_client, self.target_id, self.name)

File "/XXXXXX/spark2/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py ", строка 328, в формате get_return_value

(target_id,". ", name), value)

Py4JJavaError: Произошла ошибка при вызове z: org.apache.spark.api.python.PythonRDD.runJob.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 0 на этапе 3.0 не выполнено 4 раза, последний сбой: потерянное задание 0.3 на этапе 3.0 (TID 8, gen-CLUSTER_NODE, исполнитель 2): org.apache.spark.SparkException: Не удалось подключиться к руководителю по теме. TOPIC_NAME 1: java.nio.channels.ClosedChannelException

Однако, прежде чем эта ошибка отобразится на терминале и завершит процесс, я смог получить печатьСДР с помощью приведенного ниже кода

kvs.pprint()

Что такое лидер?Как мы можем прийти к этому?

...