Я использую python вместе с потоком воздуха и библиотекой gcp python. Я автоматизировал процесс отправки файлов в gcp с помощью воздушных потоков. Код выглядит следующим образом: -
for fileid, filename in files_dictionary.items():
if ftp.size(filename) <= int(MAX_FILE_SIZE):
data = BytesIO()
ftp.retrbinary('RETR ' + filename, callback=data.write)
f = client.File(client, fid=fileid)
size = sys.getsizeof(data.read())
// Another option is to use FileIO but not sure how
f.send(data, filename, size) // This method is in another library
Код для запуска загрузки является текущим репо (как указано выше), но реальная загрузка выполняется другой зависимостью, которая не находится под нашим контролем. Документация по этому методу:
def send(self, fp, filename, file_bytes):
"""Send file to cloud
fp file object
filename is the name of the file.
file_bytes is the size of the file in bytes
"""
data = self.initiate_resumable_upload(self.getFileid())
_, blob = self.get_gcs_blob_and_bucket(data)
# Set attachment filename. Does this work with datasets with folders
original_filename = filename.rsplit(os.sep, 1)[-1]
blob.content_disposition = "attachment;filename=" + original_filename
blob.upload_from_file(fp)
self.finish_resumable_upload(self.getFileid())
Я получаю ошибку ниже
[2020-04-23 09:43:17,239] {{models.py:1788}} ERROR - Stream must be at beginning.
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1657, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 103, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 108, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/transfer_data.py", line 241, in upload
f.send(data, filename, size)
File "/usr/local/lib/python3.6/site-packages/client/utils.py", line 53, in wrapper_timer
value = func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/client/client.py", line 518, in send
blob.upload_from_file(fp)
File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1158, in upload_from_file
client, file_obj, content_type, size, num_retries, predefined_acl
File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1068, in _do_upload
client, stream, content_type, size, num_retries, predefined_acl
File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1011, in _do_resumable_upload
predefined_acl=predefined_acl,
File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 960, in _initiate_resumable_upload
stream_final=False,
File "/usr/local/lib/python3.6/site-packages/google/resumable_media/requests/upload.py", line 343, in initiate
stream_final=stream_final,
File "/usr/local/lib/python3.6/site-packages/google/resumable_media/_upload.py", line 415, in _prepare_initiate_request
raise ValueError(u"Stream must be at beginning.")
ValueError: Stream must be at beginning.