Распределенный кластер Dask, объект 'str' не имеет атрибута 'apply' ошибка во время вычислений - PullRequest
0 голосов
/ 25 октября 2018

У меня есть динамический кластер Dask Kubernetes на GCloud, основанный на этом репо https://github.com/VMois/dask-k8s-chart.

Во время обработки данных (20 файлов паркета, около 80 МБ каждый, на хранилище Gcloud) с использованием .apply() Иногда я получаю ошибкуиз кластера:

File "/usr/local/lib/python3.5/site-packages/dask/dataframe/core.py", line 3660, in apply_and_enforce
  File "/usr/local/lib/python3.5/site-packages/dask/utils.py", line 688, in __call__
AttributeError: 'str' object has no attribute 'apply'

Моя функция для создания тестовых данных:

file_bytes = open('test_img.jpg', 'rb').read() # about 59 KB
files_count = 20
file_ids = [{'id': 10 + file_id, 'data': file_bytes} for file_id in range(files_count)]

def cluster_data_gen(args):
    amount = 1500
    face_data = args['data']
    file_id = args['id']
    files_ids = [int(file_id) for x in range(amount)]
    file_bytes = [bytes(face_data) for x in range(amount)]
    df = pd.DataFrame({
        'file_id': videos_ids,
        'file': files_bytes,
    })
    df = dd.from_pandas(df, npartitions=1)
    df.to_parquet('gcs://test/files/video_{}.parquet'.format(file_id),
                  storage_options={'token': 'cloud'},
                  object_encoding={
                      'file': 'bytes',
                      'file_id': 'int'
                  },
                  compute=True)

x = client.map(cluster_data_gen, file_ids)
client.gather(x)

Моя функция для обработки тестовых данных (там, где возникает ошибка):

client = Client('<ip>')
client.restart()
df = dask.delayed(dd.read_parquet)('gcs://test/files/video_{}.parquet/*.parquet',
                                  storage_options={'token': 'cloud'},
                                  engine='fastparquet')
df = dask.compute(df)[0]

def test(row):
    img_bytes = row['img']
    img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), -1)
    del img_bytes
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    img = cv2.imencode('.jpg', img)[1]
    return pd.Series({'img': img.tobytes(), 'video_id': row['video_id']})

df = df.apply(test, axis=1, meta={'img': 'bytes', 'video_id': 'int'})
df.to_parquet('gcs://test/result/',
              storage_options={'token': 'cloud'},
              object_encoding={'img': 'bytes', 'video_id': 'int'},
              compute=True)

Некоторая дополнительная информация:

  • каждый рабочий - это модуль Kubernetes с 0,5 ЦП и 1,5 ГБ ОЗУ.
  • кластер может масштабироваться до 5 рабочих.

Iне думаю, что проблема в данных, потому что иногда возникает ошибка, иногда нет.

Я пытался удалить тяжелые вычисления из apply() и вернуть фиктивные данные, например:

def test(row):
    img = b'a'
    return pd.Series({'img': img, 'video_id': 1})

чтобы уменьшить нагрузку на кластер, но это ничего не меняет.Кластер все еще иногда возвращает ту же ошибку.

У вас есть идеи, почему эта ошибка возникает?Может быть, вы можете предложить дальнейшие направления, чтобы узнать, почему это происходит?Большое спасибо!

Версия Dask: 0.19.1

Распределенная версия Dask: 1.23.1

Версия Python: 3.5.1

...