Нет зарегистрированных операций вывода, поэтому в PySpark нечего выполнять - PullRequest
0 голосов
/ 10 декабря 2018

Я пытаюсь интегрировать Spark с Кафкой.У меня есть потребитель Kafka есть данные JSON.Я хочу объединить потребителя кафки со Spark для обработки.Когда я запускаю ниже код ошибки выкидывает.

bin\spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 test.py localhost:9092 maktest

Мой test.py ниже

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    print (lines)
    ssc.start()
    ssc.awaitTermination()

Я получил ошибку ниже

18/12/10 16:41:40 INFO VerifiableProperties: Verifying properties
18/12/10 16:41:40 INFO VerifiableProperties: Property group.id is overridden to
18/12/10 16:41:40 INFO VerifiableProperties: Property zookeeper.connect is overridden to
<pyspark.streaming.kafka.KafkaTransformedDStream object at 0x000002A6DA9FE6A0>
18/12/10 16:41:40 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
        at scala.Predef$.require(Predef.scala:224)


Traceback (most recent call last):
  File "C:/Users/maws/Desktop/spark-2.2.1-bin-hadoop2.7/test.py", line 12, in <module>
    ssc.start()

py4j.protocol.Py4JJavaError: An error occurred while calling o25.start.
: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

1 Ответ

0 голосов
/ 10 декабря 2018

Вы не используете поддерживаемую Операцию вывода Spark Streaming DStream .

Для API pyspark вы должны использовать:

pprint()
saveAsTextFiles(prefix, [suffix])
saveAsObjectFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
foreachRDD(func)

print() canне используется с pyspark, поэтому убедитесь, что при проверке других примеров потоков Spark для Scala или Java вы изменили значение на pprint()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...