Как паралеллизировать функцию с PySpark - PullRequest
0 голосов
/ 25 сентября 2018

Как я могу распараллелить функцию, которая работает над различными фильтрами кадра данных, используя PySpark?

Например, на этом кадре данных я хотел бы сохранить вторую позицию для каждой страны.То есть строки: ('us', 2, 34) и ('ca', 3, 98) )

df = sqlContext.createDataFrame(
[
    ('us', 1, 55),
    ('us', 2, 34),
    ('us', 3, 56),
    ('ca', 3, 98),
    ('ca', 4, 65),
    ('ca', 1, 78)
],
["country", "position", "value"])

df.createOrReplaceTempView("df")

Следующий код возвращает ошибку: TypeError: объект 'JavaPackage' не вызывается

countries_list = df.select(sf.collect_set('country').alias('country')).first()['country']

countries = sc.parallelize(countries_list)

# Process filtered country in df 
def process_country(df):
    position_lst = sorted(df.select(sf.collect_set('position').alias('position')).first()['position'])

    for i in position_lst:
        for j in position_lst:
            if j>i:
                row_to_save = df.filter(col('position').isin([j]))
                row_to_save.write.save('hdfs:///user/folder', format='parquet', mode='overwrite')
                break
        break

# filter main dataframe with country = x
def process(x): 
    df = sqlContext.table("df")
    df = df.filter(col('country').isin([x]) )
    process_country(df)


countries.foreach(process) 

В настоящее время я делаю это через циклы в python, который сериализует все страны и, следовательно,занимает слишком много.Цель состоит в том, чтобы иметь возможность обрабатывать все страны параллельно.

Полная ошибка:

Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 235, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib64/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 568, in save_tuple
    save(element)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 639, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 639, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 639, in _batch_appends
    save(x)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 378, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 529, in save_function_tuple
    save(closure_values)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
    self._batch_appends(iter(obj))
  File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
    save(tmp[0])
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 372, in save_function
    self.save_function_tuple(obj)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 525, in save_function_tuple
    save(f_globals)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 804, in save_reduce
    save(state)
  File "/usr/lib64/python2.7/pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
    self._batch_setitems(obj.iteritems())
  File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
    save(v)
  File "/usr/lib64/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
TypeError: 'JavaPackage' object is not callable

Примечание. Это простой пример, в действительности действительная функция очень отличается и более сложна.Основная проблема заключается в том, чтобы выяснить, как распараллелить функции любого вида, где вам необходимо применять их к различным подсегментам фрейма данных (избегая сериализации).

...