У меня есть динамический кластер Dask Kubernetes на GCloud, основанный на этом репо https://github.com/VMois/dask-k8s-chart.
Во время обработки данных (20 файлов паркета, около 80 МБ каждый, на хранилище Gcloud) с использованием .apply()
Иногда я получаю ошибкуиз кластера:
File "/usr/local/lib/python3.5/site-packages/dask/dataframe/core.py", line 3660, in apply_and_enforce
File "/usr/local/lib/python3.5/site-packages/dask/utils.py", line 688, in __call__
AttributeError: 'str' object has no attribute 'apply'
Моя функция для создания тестовых данных:
file_bytes = open('test_img.jpg', 'rb').read() # about 59 KB
files_count = 20
file_ids = [{'id': 10 + file_id, 'data': file_bytes} for file_id in range(files_count)]
def cluster_data_gen(args):
amount = 1500
face_data = args['data']
file_id = args['id']
files_ids = [int(file_id) for x in range(amount)]
file_bytes = [bytes(face_data) for x in range(amount)]
df = pd.DataFrame({
'file_id': videos_ids,
'file': files_bytes,
})
df = dd.from_pandas(df, npartitions=1)
df.to_parquet('gcs://test/files/video_{}.parquet'.format(file_id),
storage_options={'token': 'cloud'},
object_encoding={
'file': 'bytes',
'file_id': 'int'
},
compute=True)
x = client.map(cluster_data_gen, file_ids)
client.gather(x)
Моя функция для обработки тестовых данных (там, где возникает ошибка):
client = Client('<ip>')
client.restart()
df = dask.delayed(dd.read_parquet)('gcs://test/files/video_{}.parquet/*.parquet',
storage_options={'token': 'cloud'},
engine='fastparquet')
df = dask.compute(df)[0]
def test(row):
img_bytes = row['img']
img = cv2.imdecode(np.frombuffer(img_bytes, np.uint8), -1)
del img_bytes
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
img = cv2.imencode('.jpg', img)[1]
return pd.Series({'img': img.tobytes(), 'video_id': row['video_id']})
df = df.apply(test, axis=1, meta={'img': 'bytes', 'video_id': 'int'})
df.to_parquet('gcs://test/result/',
storage_options={'token': 'cloud'},
object_encoding={'img': 'bytes', 'video_id': 'int'},
compute=True)
Некоторая дополнительная информация:
- каждый рабочий - это модуль Kubernetes с 0,5 ЦП и 1,5 ГБ ОЗУ.
- кластер может масштабироваться до 5 рабочих.
Iне думаю, что проблема в данных, потому что иногда возникает ошибка, иногда нет.
Я пытался удалить тяжелые вычисления из apply()
и вернуть фиктивные данные, например:
def test(row):
img = b'a'
return pd.Series({'img': img, 'video_id': 1})
чтобы уменьшить нагрузку на кластер, но это ничего не меняет.Кластер все еще иногда возвращает ту же ошибку.
У вас есть идеи, почему эта ошибка возникает?Может быть, вы можете предложить дальнейшие направления, чтобы узнать, почему это происходит?Большое спасибо!
Версия Dask: 0.19.1
Распределенная версия Dask: 1.23.1
Версия Python: 3.5.1