Таким образом, цель конвейера состоит в том, чтобы иметь возможность использовать переменные времени выполнения, чтобы иметь возможность открывать CSV-файл и называть таблицу BigQuery.
Все, что мне нужно, - это иметь доступ к этим переменным глобально или внутри ParDo, например, для анализа их в функции.
Я попытался создать фиктивную строку, а затем запустить FlatMap для доступа к параметрам времени выполнения и сделать их глобальными, хотя ничего не возвращается.
например.
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--path',
type=str,
help='csv storage path')
parser.add_value_provider_argument(
'--table_name',
type=str,
help='Table Id')
def run()
def rewrite_values(element):
""" Rewrite default env values"""
# global project_id
# global campaign_id
# global organization_id
# global language
# global file_path
try:
logging.info("File Path with str(): {}".format(str(custom_options.path)))
logging.info("----------------------------")
logging.info("element: {}".format(element))
project_id = str(cloud_options.project)
file_path = custom_options.path.get()
table_name = custom_options.table_name.get()
logging.info("project: {}".format(project_id))
logging.info("File path: {}".format(file_path))
logging.info("language: {}".format(table_name))
logging.info("----------------------------")
except Exception as e:
logging.info("Error format----------------------------")
raise KeyError(e)
return file_path
pipeline_options = PipelineOptions()
cloud_options = pipeline_options.view_as(GoogleCloudOptions)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
# Beginning of the pipeline
p = beam.Pipeline(options=pipeline_options)
init_data = (p
| beam.Create(["Start"])
| beam.FlatMap(rewrite_values))
pipeline processing, running pipeline etc.
Я могу получить доступ к переменной проекта без проблем, хотя все остальное возвращается пустым.
Если я сделаю переменную custom_options глобальной или когда передам конкретный таможенный объект в функцию, такую как: | 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path))
, он вернет только что-то, например RuntimeValueProvider(option: path, type: str, default_value: None)
.
Если я использую | 'Read data' >> beam.ParDo(ReadGcsBlobs(path_file=custom_options.path.get()))
, переменная и пустая строка.
По сути, мне просто нужен глобальный доступ к этим переменным, возможно ли это?
Наконец, чтобы уточнить, я не хочу использовать метод ReadFromText
, я могу использовать переменную времени выполнения там, хотя включение параметров времени выполнения в dict, созданный из файла csv, будет дорогостоящим, так как я работаю с огромные CSV-файлы.