TextIOWrapper в конвейере потока данных работает медленно - PullRequest
0 голосов
/ 26 июня 2019

Я загружаю CSV-файлы из GCS в BigQuery и запускаю задачу через Cloud Composer (затем делаю другие вещи). Команда bq load неправильно анализирует файл из-за различных символов, присутствующих в некоторых полях, поэтому я обращаюсь к Dataflow для анализа и загрузки справки. Есть 8 файлов каждый ~ 1 ГБ в размере. Это 96 столбцов данных и ~ 3M записей, загружаемых напрямую из GCS в BQ. Большинство полей имеют тип STRING с несколькими типами NUMERIC и TIMESTAMP.

Мой трубопровод работает, но невероятно вяло. Я могу успешно прочитать файлы в BigQuery, но конвейер автоматически масштабируется до +15 рабочих за 18 мин. По истечении этого времени обрабатывается только ~ 300 тыс. Строк. Пользовательский интерфейс показывает, что он едва выдвигает 300 элементов / с.

Я пробовал различные другие решения, размещенные в Интернете, но мне нужно, чтобы данные появлялись в неизмененном виде (не могу удалить странные символы), а некоторые другие решения пытаются использовать re для разделения на запятые, но там запятые везде в полях STRING, так что это не работает для меня. (Существуют также каналы, вкладки и множество потенциальных символов, поэтому разделение на что-то еще не очень полезно). Приятной частью этого решения была возможность использовать apache_beam.Map для распараллеливания операций, выполняемых над записями, но оно было выполнено неправильно, что дало мне плохие результаты с отсутствующими или поврежденными данными в определенных записях.

Библиотека csv - единственная вещь, которая последовательно анализирует файл без потери данных. Итак, я передаю открытый файл GCS в csv.DictReader, чтобы записать напрямую в BQ. Использую ли я метод self.open_file() в классе apache_beam.io.FileBasedSource или метод open из класса apache_beam.io.gcp.gcsio.GcsIO, я получаю _io.BufferedReader, который бросает мне байты вместо строк. Поэтому я использовал io.TextIOWrapper, чтобы получить строки вместо байтов, и это, казалось, «работало», но работало очень медленно, как описано выше.

Я также сначала попытался распаковать csv-файлы и прочитать их. Вместо использования io.TextIOWrapper я могу использовать gzip.open(_io.BufferedReader, 'rt'), и это работает как чудо. В этом случае конвейер работает стабильно и завершается за ~ 20 минут (все еще ощущается, как долго, в зависимости от того, что поток данных утверждает, что он может сделать, но если это лучшее, что я могу получить, то я могу жить с этим). TextIOWrapper, по-видимому, значительно замедляет его (я полагаю), а другие решения с codecs, похоже, не работают.

Как ни странно, конвейер работал локально с DirectRunner на одном из файлов 8 csv менее чем за минуту, даже с TextIOWrapper. Так что теперь я немного растерялся.

(Я также попытался запустить его в простой и грязной записи на Python в NLD_JSON с последующим использованием bq load, и это сработало, но заняло час, который этот процесс не может превышать по разным причинам.)

Вот мой конвейер с закомментированными частями, показывающими изменения, необходимые для выполнения в файле gzip:

from __future__ import absolute_import

import argparse
from argparse import RawTextHelpFormatter
import logging

import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.filebasedsource import FileBasedSource

#############################################
# gzipped CSV Reading Class that converts to dictionary
#############################################


class MyCsvFileSource(FileBasedSource):
    def read_records(self, file_pattern, range_tracker):
        import os  # Need to import these inside of class otherwise the Pipeline will not recognize the library
        import csv
        from io import TextIOWrapper  # Comment this line out when reading gzipped csv
        #import gzip  # Uncomment this line when reading gzipped csv
        from apache_beam.io.gcp.gcsio import GcsIO
        my_gcs_io = GcsIO(storage_client=os.getenv('GOOGLE_APPLICATION_CREDENTIALS')) 
        # reader = csv.DictReader(gzip.open(my_gcs_io.open(filename=file_pattern, mode='r', mime_type='text/csv'), 'rt'))  # Uncomment this line when reading gzipped csv
        reader = csv.DictReader(TextIOWrapper(my_gcs_io.open(filename=file_pattern, mode='r', mime_type='text/csv')))  # Comment this line out when reading gzipped csv
        for record in reader:
            yield record

#############################################
# Define the Pipeline
#############################################


def run(argv=None):
    """This function defines the argument parser and pipeline arguments used to run the dataflow pipeline"""
    #############################################
    # Argument Parser
    #############################################

    parser = argparse.ArgumentParser(
        description="""
        This is an apache beam pipeline that will read a gzipped csv file and write to bigquery.
        The files can be read from GCS or local and written to bigquery in the same project.
        Required Pipeline Arguments:
        - runner
            To run locally specify the flag `--runner=DirectRunner`
            To run in GCP Dataflow specify the flag `--runner=DataflowRunner`
        - project [required only if accessing GCP, not required for local -> local]
            GCP Project ID where the Dataflow job will execute
            e.g. `--project=my-gcp-project`
        - stagingLocation [can specify local storage as well if running `DirectRunner`]
            Specify a GCS storage location where the Dataflow job can stage the code for workers to execute.
        - temp_location [can specify local storage as well if running `DirectRunner`]
            Specify a GCS storage location where the Dataflow job can stage the data for temporary storage.
        - subnetwork [required for reading from GCP GCS buckets]
            Need to specify a VPC subnetwork for the project using the following format
            `--subnetwork=regions/<REGION_NAME>/subnetworks/<SUB_NETWORK_NAME>`
        """,
        formatter_class=RawTextHelpFormatter)
    parser.add_argument("--input", help='The directory or filename that will be read into the pipeline containting 1 or more gzipped csv files')
    parser.add_argument("--output", help='The `dataset.table` where the records from `--input` will be written to')
    known_args, pipeline_args = parser.parse_known_args()

    #############################################
    # Dataflow Pipeline
    #############################################

    with beam.Pipeline(argv=pipeline_args) as p:
        (p
         | 'Read Files' >> beam.io.Read(MyCsvFileSource(known_args.input))
         | 'Write to BigQuery' >> WriteToBigQuery(table=known_args.output,
                                                  create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                                                  write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Я ожидаю, что этот конвейер будет работать менее чем за 20 минут на всех 8 обычных файлах CSV из GCS в BigQuery и не удалит никаких записей в процессе.

Буду рад любой помощи, которую вы можете оказать.

1 Ответ

0 голосов
/ 26 июня 2019

** Не ответ на ваш вопрос, а альтернативный подход **

Я понимаю, что вы пытаетесь очистить ваши данные при одновременной загрузке в BQ. Возможно, вы захотите изучить Cloud dataprep (в разделе «Большие данные» в консоли GCP), который очень интуитивно понятен, например, для очистки ваших данных и типов данных (он создан для преобразования ваших данных). Затем вы можете сохранить очищенные данные обратно в GCS и просто запустить задание загрузки из самого пользовательского интерфейса BQ, чтобы заполнить таблицу больших запросов.

...