Я работаю над проектом по созданию механизма прогнозирования потоковой обработки в GCP. Я в основном учусь из этого репо здесь . Однако, когда я пытаюсь выполнить скрипт blogposts / got_sentiment / 4_streaming_pipeline / streaming_tweet.py, я продолжаю получать ошибку
NameError: name 'estimate' is not defined [while running 'generatedPtransform-129']
Моя функция выглядит следующим образом
from __future__ import absolute_import
import argparse
import datetime
import json
import logging
import numpy as np
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
from apache_beam.options.pipeline_options import StandardOptions, GoogleCloudOptions, SetupOptions, PipelineOptions
from apache_beam.transforms.util import BatchElements
from googleapiclient import discovery
def init():
........
def estimate_cmle():
init()
.....
def estimate(instances):
estimate_cmle()
......
def run(argv=None):
....
output = (lines
| 'assign window key' >> beam.WindowInto(window.FixedWindows(10))
| 'batch into n batches' >> BatchElements(min_batch_size=49, max_batch_size=50)
| 'predict sentiment' >> beam.FlatMap(lambda messages: estimate(messages))
)
.....
f __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Вот где луч не удается распознать функцию оценки, хотя я создаю ее в том же сценарии.
Редактировать
При попытке beam.FlatMap(estimate)
выдана ошибка
name 'estimate_cmle' is not defined [while running 'generatedPtransform-1208']