Я загружаю CSV-файлы из GCS в BigQuery и запускаю задачу через Cloud Composer (затем делаю другие вещи). Команда bq load
неправильно анализирует файл из-за различных символов, присутствующих в некоторых полях, поэтому я обращаюсь к Dataflow для анализа и загрузки справки. Есть 8 файлов каждый ~ 1 ГБ в размере. Это 96 столбцов данных и ~ 3M записей, загружаемых напрямую из GCS в BQ. Большинство полей имеют тип STRING с несколькими типами NUMERIC и TIMESTAMP.
Мой трубопровод работает, но невероятно вяло. Я могу успешно прочитать файлы в BigQuery, но конвейер автоматически масштабируется до +15 рабочих за 18 мин. По истечении этого времени обрабатывается только ~ 300 тыс. Строк. Пользовательский интерфейс показывает, что он едва выдвигает 300 элементов / с.
Я пробовал различные другие решения, размещенные в Интернете, но мне нужно, чтобы данные появлялись в неизмененном виде (не могу удалить странные символы), а некоторые другие решения пытаются использовать re
для разделения на запятые, но там запятые везде в полях STRING, так что это не работает для меня. (Существуют также каналы, вкладки и множество потенциальных символов, поэтому разделение на что-то еще не очень полезно). Приятной частью этого решения была возможность использовать apache_beam.Map
для распараллеливания операций, выполняемых над записями, но оно было выполнено неправильно, что дало мне плохие результаты с отсутствующими или поврежденными данными в определенных записях.
Библиотека csv
- единственная вещь, которая последовательно анализирует файл без потери данных. Итак, я передаю открытый файл GCS в csv.DictReader
, чтобы записать напрямую в BQ. Использую ли я метод self.open_file()
в классе apache_beam.io.FileBasedSource
или метод open
из класса apache_beam.io.gcp.gcsio.GcsIO
, я получаю _io.BufferedReader
, который бросает мне байты вместо строк. Поэтому я использовал io.TextIOWrapper
, чтобы получить строки вместо байтов, и это, казалось, «работало», но работало очень медленно, как описано выше.
Я также сначала попытался распаковать csv-файлы и прочитать их. Вместо использования io.TextIOWrapper
я могу использовать gzip.open(_io.BufferedReader, 'rt')
, и это работает как чудо. В этом случае конвейер работает стабильно и завершается за ~ 20 минут (все еще ощущается, как долго, в зависимости от того, что поток данных утверждает, что он может сделать, но если это лучшее, что я могу получить, то я могу жить с этим). TextIOWrapper
, по-видимому, значительно замедляет его (я полагаю), а другие решения с codecs
, похоже, не работают.
Как ни странно, конвейер работал локально с DirectRunner на одном из файлов 8 csv менее чем за минуту, даже с TextIOWrapper
. Так что теперь я немного растерялся.
(Я также попытался запустить его в простой и грязной записи на Python в NLD_JSON
с последующим использованием bq load
, и это сработало, но заняло час, который этот процесс не может превышать по разным причинам.)
Вот мой конвейер с закомментированными частями, показывающими изменения, необходимые для выполнения в файле gzip:
from __future__ import absolute_import
import argparse
from argparse import RawTextHelpFormatter
import logging
import apache_beam as beam
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.filebasedsource import FileBasedSource
#############################################
# gzipped CSV Reading Class that converts to dictionary
#############################################
class MyCsvFileSource(FileBasedSource):
def read_records(self, file_pattern, range_tracker):
import os # Need to import these inside of class otherwise the Pipeline will not recognize the library
import csv
from io import TextIOWrapper # Comment this line out when reading gzipped csv
#import gzip # Uncomment this line when reading gzipped csv
from apache_beam.io.gcp.gcsio import GcsIO
my_gcs_io = GcsIO(storage_client=os.getenv('GOOGLE_APPLICATION_CREDENTIALS'))
# reader = csv.DictReader(gzip.open(my_gcs_io.open(filename=file_pattern, mode='r', mime_type='text/csv'), 'rt')) # Uncomment this line when reading gzipped csv
reader = csv.DictReader(TextIOWrapper(my_gcs_io.open(filename=file_pattern, mode='r', mime_type='text/csv'))) # Comment this line out when reading gzipped csv
for record in reader:
yield record
#############################################
# Define the Pipeline
#############################################
def run(argv=None):
"""This function defines the argument parser and pipeline arguments used to run the dataflow pipeline"""
#############################################
# Argument Parser
#############################################
parser = argparse.ArgumentParser(
description="""
This is an apache beam pipeline that will read a gzipped csv file and write to bigquery.
The files can be read from GCS or local and written to bigquery in the same project.
Required Pipeline Arguments:
- runner
To run locally specify the flag `--runner=DirectRunner`
To run in GCP Dataflow specify the flag `--runner=DataflowRunner`
- project [required only if accessing GCP, not required for local -> local]
GCP Project ID where the Dataflow job will execute
e.g. `--project=my-gcp-project`
- stagingLocation [can specify local storage as well if running `DirectRunner`]
Specify a GCS storage location where the Dataflow job can stage the code for workers to execute.
- temp_location [can specify local storage as well if running `DirectRunner`]
Specify a GCS storage location where the Dataflow job can stage the data for temporary storage.
- subnetwork [required for reading from GCP GCS buckets]
Need to specify a VPC subnetwork for the project using the following format
`--subnetwork=regions/<REGION_NAME>/subnetworks/<SUB_NETWORK_NAME>`
""",
formatter_class=RawTextHelpFormatter)
parser.add_argument("--input", help='The directory or filename that will be read into the pipeline containting 1 or more gzipped csv files')
parser.add_argument("--output", help='The `dataset.table` where the records from `--input` will be written to')
known_args, pipeline_args = parser.parse_known_args()
#############################################
# Dataflow Pipeline
#############################################
with beam.Pipeline(argv=pipeline_args) as p:
(p
| 'Read Files' >> beam.io.Read(MyCsvFileSource(known_args.input))
| 'Write to BigQuery' >> WriteToBigQuery(table=known_args.output,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Я ожидаю, что этот конвейер будет работать менее чем за 20 минут на всех 8 обычных файлах CSV из GCS в BigQuery и не удалит никаких записей в процессе.
Буду рад любой помощи, которую вы можете оказать.