Как я могу распараллелить функцию, которая работает над различными фильтрами кадра данных, используя PySpark?
Например, на этом кадре данных я хотел бы сохранить вторую позицию для каждой страны.То есть строки: ('us', 2, 34) и ('ca', 3, 98) )
df = sqlContext.createDataFrame(
[
('us', 1, 55),
('us', 2, 34),
('us', 3, 56),
('ca', 3, 98),
('ca', 4, 65),
('ca', 1, 78)
],
["country", "position", "value"])
df.createOrReplaceTempView("df")
Следующий код возвращает ошибку: TypeError: объект 'JavaPackage' не вызывается
countries_list = df.select(sf.collect_set('country').alias('country')).first()['country']
countries = sc.parallelize(countries_list)
# Process filtered country in df
def process_country(df):
position_lst = sorted(df.select(sf.collect_set('position').alias('position')).first()['position'])
for i in position_lst:
for j in position_lst:
if j>i:
row_to_save = df.filter(col('position').isin([j]))
row_to_save.write.save('hdfs:///user/folder', format='parquet', mode='overwrite')
break
break
# filter main dataframe with country = x
def process(x):
df = sqlContext.table("df")
df = df.filter(col('country').isin([x]) )
process_country(df)
countries.foreach(process)
В настоящее время я делаю это через циклы в python, который сериализует все страны и, следовательно,занимает слишком много.Цель состоит в том, чтобы иметь возможность обрабатывать все страны параллельно.
Полная ошибка:
Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 235, in dump
return Pickler.dump(self, obj)
File "/usr/lib64/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 568, in save_tuple
save(element)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 639, in _batch_appends
save(x)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 639, in _batch_appends
save(x)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 639, in _batch_appends
save(x)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
save(closure_values)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 372, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 525, in save_function_tuple
save(f_globals)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
TypeError: 'JavaPackage' object is not callable
Примечание. Это простой пример, в действительности действительная функция очень отличается и более сложна.Основная проблема заключается в том, чтобы выяснить, как распараллелить функции любого вида, где вам необходимо применять их к различным подсегментам фрейма данных (избегая сериализации).