Как получить данные с Кассандры через pyspark? - PullRequest
0 голосов
/ 11 апреля 2019

Я пытаюсь получить данные от Кассандры через pyspark. И я получил разъем от github . Но я не смог этого сделать. Ниже приведен код.

import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext
from pyspark import SparkConf
#from pyspark.sql import SQLContext

conf = SparkConf() \
    .setAppName("PySpark Cassandra Test") \
    .setMaster("spark://192.192.141.21:7077") \
    .set("spark.cassandra.connection.host", "192.192.141.26:9042")

sc = CassandraSparkContext(conf=conf)
sc.cassandraTable("oltpdb", "XiangWan") \
    .select("dt", "wid") \
    .where("wid='XiangWan001'", "daybucket in ('20190326')","dt >= '2019-03-26 13:18:03'") \
    .collect()

Итак, с помощью следующей команды:

spark-submit /root/model/connect_cannandra_via_spark.py

Я получил ошибку:

Traceback (most recent call last):                                                                                                              
  File "/root/model/connect_cannandra_via_spark.py", line 25, in <module>                                                                       
    df = (SQLContext                                                                                                                            
AttributeError: 'property' object has no attribute 'format'                                                                                     
[root@CDH21 python]# spark-submit /root/model/connect_cannandra_via_spark.py                                                                    
19/04/11 14:06:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
19/04/11 14:06:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.                                              
19/04/11 14:06:39 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.                                              
Traceback (most recent call last):                                                                                                              
  File "/root/model/connect_cannandra_via_spark.py", line 12, in <module>                                                                       
    sc.cassandraTable("oltpdb", "XiangWan") \                                                                                                   
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/context.py", line 33, in cassandraTable 
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/rdd.py", line 324, in __init__          
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/rdd.py", line 213, in _helper           
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/util.py", line 99, in helper            
  File "/root/anaconda3/lib/python3.6/site-packages/pyspark_cassandra-0.9.0-py3.6.egg/pyspark_cassandra/util.py", line 88, in load_class        
  File "/root/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__                            
  File "/root/spark-2.2.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value                         
py4j.protocol.Py4JJavaError: An error occurred while calling o24.loadClass.                                                                     
: java.lang.ClassNotFoundException: pyspark_cassandra.PythonHelper                                                                              
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)                                                                           
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)                                                                                
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)                                                                                
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                                          
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)                                                        
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)                                                
        at java.lang.reflect.Method.invoke(Method.java:498)                                                                                     
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)                                                                         
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)                                                                   
        at py4j.Gateway.invoke(Gateway.java:280)                                                                                                
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)                                                                 
        at py4j.commands.CallCommand.execute(CallCommand.java:79)                                                                               
        at py4j.GatewayConnection.run(GatewayConnection.java:214)                                                                               
        at java.lang.Thread.run(Thread.java:748)               

Как мне поступить?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...