мой прямой бегун работает нормально, за исключением того, что его не вставляют в bigquery.
Хотя мой DataFlowRunner вообще не работает. У меня есть установочный файл и все. Он также не отображает никаких ошибок, поэтому мне сложно его отлаживать, помогите
import datetime
import logging
import json
import argparse
from datetime import datetime
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
# Configuration
# I set all the constant before creating the Pipeline Options
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
#options.view_as(StandardOptions).runner = 'DirectRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = '*******-230413'
def run(argv=None):
"""
Define and run the pipeline.
"""
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=True,
help='GCS destination folder to save the images to (example: gs://BUCKET_NAME/path')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<project name>/topics/<topic name>".'))
known_args, pipeline_args = parser.parse_known_args(argv)
# DoFn
class ExtractHashtagsDoFn(beam.DoFn):
def process(self, element):
"""
Takes a text as input, search and return its content
"""
result = {'text': element}
logging.info('{}'.format(result))
yield result
class SentimentAnalysisDoFn(beam.DoFn):
def process(self, element):
from google.cloud import language
from google.gax.errors import RetryError
client = language.LanguageServiceClient()
document = language.types.Document(
content=element['text'].encode('utf-8'),
type='PLAIN_TEXT')
try:
response = client.analyze_sentiment(
document=document,
encoding_type='UTF8')
sentiment = response.document_sentiment
element['sentiment_score'] = sentiment.score
element['sentiment_magnitude'] = sentiment.magnitude
except RetryError:
element['sentiment_score'] = None
element['sentiment_magnitude'] = None
# logging.info('{}'.format(element))
yield element
class WriteToGCS(beam.DoFn):
def __init__(self, outdir):
#source_date=datetime.now().strftime("%Y%m%d-%H%M%S")
self.outdir = outdir
def process(self, element):
element = json.dumps(element).encode('utf-8')
source_date=datetime.now().strftime("%Y%m%d-%H%M%S")
#outdir = known_args.output+'output'+format(source_date) +'.txt'
writer = FileSystems.create(self.outdir+'output'+format(source_date) +'.txt','text/plain')
writer.write(element)
writer.close()
# Define Composite Transforms
class TextAnalysisTransform(beam.PTransform):
def expand(self, pcoll):
return(
pcoll
| 'Decode' >> beam.Map(lambda string: string.decode('utf8', 'ignore'))
| 'ExtractHashtags' >> beam.ParDo(ExtractHashtagsDoFn())
| 'SentimentAnalysis' >> beam.ParDo(SentimentAnalysisDoFn())
| 'Save file' >> beam.ParDo(WriteToGCS(known_args.output))
)
class WindowingForOutputTransform(beam.PTransform):
def expand(self, pcoll):
import json
return(
pcoll
| 'Pack' >> beam.Map(lambda x: (x, 1))
| 'Windowing' >> beam.WindowInto(window.FixedWindows(5, 0))
| 'GroupByKey' >> beam.GroupByKey()
| 'Unpack' >> beam.Map(lambda x: x[0])
)
class SaveToBigQueryTransform(beam.PTransform):
def expand(self, pcoll):
# Define the Schema.
from apache_beam.io.gcp.internal.clients import bigquery
table_schema = bigquery.TableSchema()
# Fields that use standard types.
alpha_schema = bigquery.TableFieldSchema()
alpha_schema.name = 'text'
alpha_schema.type = 'string'
alpha_schema.mode = 'nullable'
table_schema.fields.append(alpha_schema)
beta_schema = bigquery.TableFieldSchema()
beta_schema.name = 'sentiment_score'
beta_schema.type = 'float'
beta_schema.mode = 'nullable'
table_schema.fields.append(beta_schema)
gamma_schema = bigquery.TableFieldSchema()
gamma_schema.name = 'sentiment_magnitude'
gamma_schema.type = 'float'
gamma_schema.mode = 'nullable'
table_schema.fields.append(gamma_schema)
# Saving the output to BigQuery.
return (
pcoll
| 'PrepareForOutput' >> WindowingForOutputTransform()
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table='test',
dataset='testimport',
project='*******-230413',
schema=table_schema, # Pass the defined table_schema
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
with beam.Pipeline(options=options) as p:
plumbing = (
p
| 'LoadTestData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
| 'decode' >> beam.Map(lambda x: json.loads(x.decode('utf-8')))
| 'read_file' >> beam.FlatMap(lambda metadata: FileSystems.open('gs://%s/%s' % (metadata['bucket'], metadata['name'])))
| 'TextAnalysis' >> TextAnalysisTransform()
| 'StreamToBigQuery' >> SaveToBigQueryTransform()
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Что я здесь делаю не так? Также я создал таблицу bigquery при запуске прямого запуска, я не смог просмотреть предварительный просмотр таблицы, поэтому я выполнил запрос, чтобы увидеть содержимое таблицы, и в следующий раз он прекратил потоковую передачу.