Есть ли альтернативный способ для collect () в pyspark? py4j.protocol.Py4JJavaError: Произошла ошибка при вызове 0323.collectToPython - PullRequest
0 голосов
/ 04 июля 2019

Сценарий Pyspark падает, когда я использую collect () или show () в pyspark. В моем фрейме данных всего 570 строк, поэтому я не понимаю, что происходит.

У меня есть Dataframe, и я создал функцию, которая извлекает из него список с отдельными строками. Все работало нормально, но вдруг у меня возникла ошибка:

py4j.protocol.Py4JJavaError: An error occurred while calling 
0323.collectToPython

Похожая ошибка возникает при попытке показать () фрейм данных.

Есть ли альтернативный метод для извлечения списка с различными значениями из кадра данных?

required_list = [(col1,col2), (col1,col2)]

Извините, что не опубликовал код, но это большой скрипт и конфиденциальный.

Обновление:

У меня есть функция, которая извлекает различные значения из df:

def extract_dist(df, select_cols):
  val = len(select_cols)
  list_val = [row[0:val] for row in df.select(*select_cols)).distinct.na.drop().collect()]
return list_val

Функция работала нормально, пока у меня не было ошибки.

У меня есть основной скрипт, в который я импортирую эти функции, а также другую функцию, которая вычисляет фрейм данных:

def calculate_df(df_join, v_srs, v_db, v_tbl, sql_context):
  cmd = "impala-shel....'create table db.tbl as select * from v_db.v_tbl'"
  os.system(cmd)

  select_stm = "select * from db.tbl"
  df = sql_context(select_stm)

  cmd = "impala-shel....'drop table if exists db.tbl'"
  os.system(cmd)

  join cond = [...]
  joined_df = df.join(df_join, join_cond, 'left').select(..)

  df1 = joined_df.filer(...)
  df2 = joined_df.filer(...)

  final_df = df1.union(df2)

  final_df.show() # error from show

  return final_df

Основной скрипт:

import extract_dist
import calculate_df


df_join = ...extract from a hive table

for conn in details:
  v_db = conn['database'].upper()
  v_tbl = conn['table'].upper()
  v_name = conn['descr'].upper()

  if v_name in lst:
    df = calculate_df(df_join, v_name, v_db, v_tbl, sqlContext)

  df = df.filter(...column isin list)
  df = df.filter(..).filter(..)

  # extract list with distinct rows from df using dist function

  df.show() # error from show

  dist_list = extract_dist(df, [col1,col2]) # error from collect

  for x, y in dist_list:
    ....

Если я не использую show (), ошибка появляется, когда я запускаю метод collect ().
Те же сценарии работали раньше и неожиданно потерпели неудачу. Это проблема с памятью? я должен очистить память?

РЕШИТЬ:

Я нашел проблему. После того, как я создал фрейм данных из таблицы, я удалил таблицу.

  cmd = "impala-shel....'drop table if exists db.tbl'"
  os.system(cmd)

После того, как я удалил команду с удаленной таблицей, скрипт успешно запустился. Я опущу временную таблицу в конце скрипта, после того как я закончу с извлеченным фреймом данных. Я не знал, что если мы создадим фрейм данных и после этого удалим исходную таблицу, у меня впоследствии будет ошибка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...