Воздушный поток с gcp python: ValueError: Поток должен быть в начале - PullRequest
0 голосов
/ 23 апреля 2020

Я использую python вместе с потоком воздуха и библиотекой gcp python. Я автоматизировал процесс отправки файлов в gcp с помощью воздушных потоков. Код выглядит следующим образом: -

for fileid, filename in files_dictionary.items():
    if ftp.size(filename) <= int(MAX_FILE_SIZE):
        data = BytesIO()
        ftp.retrbinary('RETR ' + filename, callback=data.write)
        f = client.File(client, fid=fileid)
        size = sys.getsizeof(data.read())
        // Another option is to use FileIO but not sure how
        f.send(data, filename, size) // This method is in another library 

Код для запуска загрузки является текущим репо (как указано выше), но реальная загрузка выполняется другой зависимостью, которая не находится под нашим контролем. Документация по этому методу:

 def send(self, fp, filename, file_bytes):
        """Send file to cloud
        fp file object
        filename   is the name of the file.
        file_bytes is the size of the file in bytes
        """
        data = self.initiate_resumable_upload(self.getFileid())

        _, blob = self.get_gcs_blob_and_bucket(data)

        # Set attachment filename. Does this work with datasets with folders
        original_filename = filename.rsplit(os.sep, 1)[-1]
        blob.content_disposition = "attachment;filename=" + original_filename

        blob.upload_from_file(fp)

        self.finish_resumable_upload(self.getFileid())

Я получаю ошибку ниже

[2020-04-23 09:43:17,239] {{models.py:1788}} ERROR - Stream must be at beginning.
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1657, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 103, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 108, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/transfer_data.py", line 241, in upload
    f.send(data, filename, size)
  File "/usr/local/lib/python3.6/site-packages/client/utils.py", line 53, in wrapper_timer
    value = func(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/client/client.py", line 518, in send
    blob.upload_from_file(fp)
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1158, in upload_from_file
    client, file_obj, content_type, size, num_retries, predefined_acl
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1068, in _do_upload
    client, stream, content_type, size, num_retries, predefined_acl
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 1011, in _do_resumable_upload
    predefined_acl=predefined_acl,
  File "/usr/local/lib/python3.6/site-packages/google/cloud/storage/blob.py", line 960, in _initiate_resumable_upload
    stream_final=False,
  File "/usr/local/lib/python3.6/site-packages/google/resumable_media/requests/upload.py", line 343, in initiate
    stream_final=stream_final,
  File "/usr/local/lib/python3.6/site-packages/google/resumable_media/_upload.py", line 415, in _prepare_initiate_request
    raise ValueError(u"Stream must be at beginning.")
ValueError: Stream must be at beginning.

1 Ответ

0 голосов
/ 27 апреля 2020

При чтении двоичного файла вы можете перемещаться по нему, используя операции поиска. Другими словами, вы можете переместить ссылку из начала файла в любую другую позицию. Ошибка ValueError: Stream must be at beginning. в основном говорит: "ваша ссылка не указана на начало потока, и она должна быть"

Учитывая, что вам нужно вернуть ссылку на начало потока. Вы можете сделать это, используя функцию seek.

. В вашем случае вы бы сделали что-то вроде:

    data = BytesIO()
    ftp.retrbinary('RETR ' + filename, callback=data.write)
    f = client.File(client, fid=fileid)
    size = sys.getsizeof(data.read())
    data.seek(0)
    f.send(data, filename, size)
...