Как записать данные структурированного потока в таблицу Cassandra с помощью pyspark? - PullRequest
0 голосов
/ 04 марта 2020

Это моя терминальная команда для запуска файла strm.py

$ SPARK_HOME / bin / spark-submit --master local --driver-memory 4g --num -executors 2 --executor-memory 4g - пакеты org. apache .spark: spark- sql -kafka-0-10_2.11: 2.4.0 org. apache .spark: spark-cassandra-connector_2 .11: 2.4.0 strm.py

Ошибка:

Невозможно загрузить основной класс из организации JAR. apache .spark: spark -cassandra-connector_2.11: 2.4.0 с URI org. apache .spark. Пожалуйста, укажите класс через --class. at or. * .spark.deploy.SparkSubmitArguments. (SparkSubmitArguments. scala: 116) в организации. apache .spark.deploy.SparkSubmit $$ anon $ 2 $$ anon $ 1. (SparkSubmit. scala: 907) в орг. apache .spark.deploy.SparkSubmit $$ anon $ 2.parseArguments (SparkSubmit. scala: 907) в орг. apache .spark.deploy.SparkSubmit.doSubmit (SparkSubmit. scala: 81) в орг. apache .spark.deploy.SparkSubmit $$ anon $ 2.doSubmit (SparkSubmit. scala: 920) в org. apache .spark.deploy.SparkSubmit $ .main (SparkSubmit. scala: 929) в орг. . apache .spark.deploy.SparkSubmit.main (SparkSubmit. scala)

Так что кто-нибудь может мне помочь, в чем проблема с этим, почему он не может загружаться.

1 Ответ

1 голос
/ 04 марта 2020

У вас 2 проблемы:

  • вы неверно подаете заявку - у вас нет запятой между org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 и org.apache.spark:spark-cassandra-connector_2.11:2.4.0, поэтому spark-submit относится к кассандре соединитель в виде баночки вместо файла python.

  • текущая версия Spark Cassandra Connector не поддерживает прямую запись для данных Spark Structured Streaming - эта функция доступна только в DSE Analytics. Но вы можете обойти это, используя foreachBatch, что-то вроде этого (не проверено, рабочий код Scala доступен здесь ):

def foreach_batch_function(df, epoch_id):
    df.format("org.apache.spark.sql.cassandra").option("keyspace","test")\
       .option("table", "my_tables").mode('append').save()

query.writeStream.foreachBatch(foreach_batch_function).start()  
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...