Я создал работу с потоком данных Google, но продолжаю получать глобальное имя 'bigquery', не определенное, хотя я уже импортировал необходимую переменную.
Это мой список импорта:
from __future__ import absolute_import
import argparse
import logging
import ast
import json
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from google.cloud import bigquery
И этот класс возвращает ошибку:
class CheckExistance(beam.DoFn):
def __init__(self, table):
self.table = table.replace(":", ".")
def process(self, element):
client = bigquery.Client()
date = element['date'].split(" ")[0]
query_job = client.query("""
QUERY """ % (self.table, date))
yield element
Ребята, вы знаете, что может быть причиной этой ошибки?Кстати, я получаю эту ошибку только при развертывании ее в заданиях потока данных Google, она работает нормально локально.
РЕДАКТИРОВАТЬ:
Я смог исправить свою первоначальную проблему, изменивпозиция моего импорта находится внутри функции, которой нужна переменная bigquery, например:
class CheckExistance(beam.DoFn):
def __init__(self, table):
self.table = table.replace(":", ".")
def process(self, element):
from google.cloud import bigquery
client = bigquery.Client()
date = element['date'].split(" ")[0]
query_job = client.query("""
QUERY""" % (self.table, date))
yield element
Но теперь я получаю сообщение об ошибке, в котором говорится, что у «клиента» нет запроса атрибута, даже если мои пакетыв задании потока данных обновлены и выполняются без проблем локально.
Сообщение об ошибке:
AttributeError: у объекта «Клиент» нет атрибута «запрос»