Читайте CSV и пишите в BigQuery из Apache Beam - PullRequest
0 голосов
/ 22 апреля 2019

У меня есть корзина GCS, из которой я пытаюсь прочитать около 200k файлов и затем записать их в BigQuery.Проблема в том, что у меня проблемы с созданием PCollection, которая хорошо работает с кодом.Я следую этому учебнику для справки.

У меня есть этот код:

from __future__ import absolute_import

import argparse
import logging
import os

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage

import regex as re

# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
files = ['gs://mybucket/_chunk1',
         'gs://mybucket/_chunk0']


class DataIngestion:
    """A helper class which contains the logic to translate the file into
    a format BigQuery will accept."""

    def parse_method(self, string_input):

        x="""{}""".format(string_input)
        rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")
        d = {}
        d['name'], d['date'], d['geometry'], d['value0'], d['value1'], d['value2']=rx.split(x)
        d['geometry']=d['geometry'].strip('"')

        return d

def run(argv=None):
    """Main entry point; defines and runs the pipeline."""

    data_ingestion = DataIngestion()
    p = beam.Pipeline(options=PipelineOptions())


    (p
    | 'Create PCollection' >> beam.Create(files)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)
    | 'String To BigQuery Row' >> beam.Map(lambda s:
    data_ingestion.parse_method(s))
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.BigQuerySink(
    'mytable',
    dataset='mydataset',
    schema=myschema,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

Проблема в том, что этот код отлично работает, если список files содержит толькоодин элемент.Как только появляется более 1 элемента, преобразование String To BigQuery Row выдает ошибку и сообщает error: nothing to repeat [while running 'String To BigQuery Row'].Вероятно, это связано с модулем регулярных выражений, но я не могу понять, что не так, потому что он отлично работает, когда дается 1 файл.

Редактировать: Странно, он хорошо работает через DirectRunner.Я передаю файл requirements.txt, как указано здесь .

Вот как я выполняю конвейер:

python streaming_inserts.py --runner=DataFlowRunner --project=my-project --temp_location=gs://temp/ --staging_location=gs://stage/  --requirements_file requirements.txt --disk_size_gb 1000 --region us-east1

Мой requirements.txt выглядит такэто:

regex
google-cloud-storage

Также, согласно журналам, устанавливается пакет: enter image description here

1 Ответ

5 голосов
/ 23 апреля 2019

Комментарий ОП заставил меня осознать мою ошибку: предполагаемая библиотека - regex, а не встроенная в Python re.

Использование import regex as re не только сбило меня с толку, но и заставило библиотеку re выдать ошибку nothing to repeat. Это связано с тем, что Dataflow по умолчанию не сохраняет основной сеанс.

Когда выполняется код в вашей функции синтаксического анализа, он не имеет доступа к контексту re, который вы импортировали во время сборки. Обычно это не удастся с NameError, но поскольку вы используете правильное имя библиотеки, код предполагает, что вы имеете в виду встроенную библиотеку re и пытается выполнить ее как таковую.

Если вместо этого вы используете import regex, вы увидите NameError: name 'regex' is not defined, что является реальной причиной сбоя кода. Чтобы обойти это, либо переместите оператор импорта в саму функцию синтаксического анализа, либо передайте --save_main_session в качестве опции для бегуна. См. здесь для получения более подробной информации.


Старый ответ:

Хотя я не могу сказать, какую версию Python вы используете, похоже, ваше подозрение относительно регулярного выражения верное. * - это специальный символ, обозначающий повторы того, что было до него, но ( - это специальный символ, обозначающий группировку, поэтому шаблон, подобный (*SKIP), не выглядит грамматически правильным.

В Python 3.7 вышеприведенное выражение даже не компилируется:

python -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 234, in compile
    return _compile(pattern, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 286, in _compile
    p = sre_compile.compile(pattern, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_compile.py", line 764, in compile
    p = sre_parse.parse(p, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 930, in parse
    p = _parse_sub(source, pattern, flags & SRE_FLAG_VERBOSE, 0)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
    not nested and not items))
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 816, in _parse
    p = _parse_sub(source, state, sub_verbose, nested + 1)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
    not nested and not items))
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 651, in _parse
    source.tell() - here + len(this))
re.error: nothing to repeat at position 11

Python 2.7.15 также не принимает его:

python2 -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python2.7/re.py", line 194, in compile
    return _compile(pattern, flags)
  File "/usr/lib/python2.7/re.py", line 251, in _compile
    raise error, v # invalid expression
sre_constants.error: nothing to repeat

Хотя я не знаю, с какими строками вы пытаетесь соответствовать, я подозреваю, что некоторым вашим персонажам нужно сбежать. например "\{[^{}]+\}(\*SKIP)(\*FAIL)|,"

...