Вставьте данные в mysql, используя поток данных - PullRequest
0 голосов
/ 02 марта 2020

Код ниже строит конвейер и генерируется DAG. RuntimeError: NotImplementedError [во время работы generatePtransform-438 '] Пожалуйста, дайте мне знать, если есть прямой соединитель для mysql в python для луча.

from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import pubsub_v1
from google.cloud import bigquery
import  mysql.connector
import apache_beam as beam
import logging
import argparse
import sys
import re


PROJECT="12344"
TOPIC = "projects/12344/topics/mytopic"    

class insertfn(beam.Dofn):
def insertdata(self,data):
db_conn=mysql.connector.connect(host="localhost",user="abc",passwd="root",database="new")
db_cursor=db_conn.cursor()
emp_sql = " INSERT INTO emp(ename,eid,dept) VALUES (%s,%s,%s)"
db_cusror.executemany(emp_sql,(data[0],data[1],data[2]))
db_conn.commit()
print(db_cursor.rowcount,"record inserted")
class Split(beam.DoFn):
def process(self, data):
data = data.split(",")               
return [{ 
'ename': data[0],
'eid': data[1],
'dept': data[2]         
}]

def main(argv=None):

parser = argparse.ArgumentParser()
parser.add_argument("--input_topic")
parser.add_argument("--output")
known_args = parser.parse_known_args(argv)


p = beam.Pipeline(options=PipelineOptions())

(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)
| "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
| 'ParseCSV' >> beam.ParDo(Split())
| 'WriteToMySQL' >> beam.ParDo(insertfn())
)
result = p.run()
result.wait_until_finish()

1 Ответ

0 голосов
/ 04 марта 2020

После нашего обсуждения в разделе комментариев я заметил, что вы не используете правильные команды для выполнения конвейера DataFlow.

Согласно документации существуют обязательные флаги, которые должны быть определены для запуска конвейера в Управляемая служба потока данных . Эти флаги описаны ниже:

  • имя-задания - Имя выполняемого задания потока данных.

  • project - Идентификатор вашего проекта Google Cloud. runner - конвейер

  • runner - , который проанализирует вашу программу и создаст ваш конвейер. Для облачного выполнения это должен быть DataflowRunner.

  • staging_location - Путь облачного хранилища для потока данных для создания пакетов кода, необходимых работникам, выполняющим работу.

  • temp_location - Путь облачного хранилища для потока данных для размещения временных файлов заданий, созданных во время выполнения конвейера.

Помимо этих флагов, в вашем случае вы можете использовать и другие, поскольку вы используете PubSub topi c:

  • - input_topi c: задает входные данные Pub / Sub topi c для чтения сообщений из.

Поэтому пример запуска конвейера потока данных будет следующим:

  python RunPipelineDataflow.py \
  --job_name=jobName\ 
  --project=$PROJECT_NAME \
  --runner=DataflowRunner \
  --staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY\
  --temp_location=gs://$BUCKET_NAME/temp
  --input_topic=projects/$PROJECT_NAME/topics/$TOPIC_NAME \

Я хотел бы указать на важность использования DataflowRunner , он позволяет вам использовать управляемый сервис Cloud Dataflow, предоставляя полностью управляемый сервис, автоматическое масштабирование и динамическую перебалансировку работы c. Тем не менее, также возможно использовать DirectRunner , который выполняет ваш конвейер на вашей машине, он предназначен для проверки конвейера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...