Я пытаюсь преобразовать искру df в pandas с помощью stream_df1.select('text').toPandas()
, но получаю следующую ошибку:
<ipython-input-18-bbda306c7ad1> in <module>
----> 1 stream_df1.select('text').toPandas()
/opt/spark/python/pyspark/sql/dataframe.py in toPandas(self)
2200 from pyspark.sql.types import _check_dataframe_localize_timestamps
2201 import pyarrow
-> 2202 batches = self._collectAsArrow()
2203 if len(batches) > 0:
2204 table = pyarrow.Table.from_batches(batches)
/opt/spark/python/pyspark/sql/dataframe.py in _collectAsArrow(self)
2306 .. note:: Experimental.
2307 """
-> 2308 with SCCallSiteSync(self._sc) as css:
2309 port, auth_secret, jsocket_auth_server = self._jdf.collectAsArrowToPython()
2310
/opt/spark/python/pyspark/traceback_utils.py in __enter__(self)
70 def __enter__(self):
71 if SCCallSiteSync._spark_stack_depth == 0:
---> 72 self._context._jsc.setCallSite(self._call_site)
73 SCCallSiteSync._spark_stack_depth += 1
74
AttributeError: 'NoneType' object has no attribute 'setCallSite'
Дополнительная информация:
type(stream_df1.select('text'))
pyspark.sql.dataframe.DataFrame
root
|-- text: string (nullable = true)
|-- Score: string (nullable = true)
stream_df1.select('text').show()
+--------------------+
| text|
+--------------------+
|white people hist...|
|i multi talented ...|
|the american drea...|
|how covid devaste...|
|coronavirus cases...|
|work hard get hig...|
|visit fiverr prof...|
|essentialsforesse...|
|working crisis fo...|
|us casinos push c...|
|dexamethasone fir...|
|the novel coronav...|
|coronavirus rise ...|
|heading towards c...|
|covid coronavirus...|
|this thread impor...|
|hi moving towards...|
|the pandemic disa...|
|the continuing pr...|
|i beach year old ...|
+--------------------+
only showing top 20 rows
О конфигурации:
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64/jre
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
sc._conf.getAll()
[('spark.app.id', 'local-1592332286967'),
('spark.sql.catalogImplementation', 'hive'),
('spark.rdd.compress', 'True'),
('spark.serializer.objectStreamReset', '100'),
('spark.master', 'local[*]'),
('spark.submit.pyFiles', ''),
('spark.executor.id', 'driver'),
('spark.submit.deployMode', 'client'),
('spark.app.name', 'PySparkShell'),
('spark.driver.host', '192.168.1.12'),
('spark.ui.showConsoleProgress', 'true'),
('spark.driver.port', '43537')]
То, что я использовал, пытаясь получить желаемый результат, но получаю та же ошибка
stream_df1.sql_ctx.sparkSession._jsparkSession = spark._jsparkSession
stream_df1._sc = spark._sc
from pyspark.sql.functions import *
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
stream_df1.select("*").toPandas()
stream_df1.select("text").rdd.flatMap(lambda x: x).collect()
Странно то, что в какой-то момент сегодня днем он работал без ошибок, но только пару раз.
Пожалуйста, любой помощь очень ценится