Планирование загрузки из BigQuery в MongoDB с использованием apache-airflow (cloud-composer) - PullRequest
0 голосов
/ 20 ноября 2018

Я пытаюсь настроить конвейеры данных, которые перемещают данные из GCS в BigQuery, выполняют определенные задачи / обработку и загружают их в кластер MongoDB (все настроены в python с использованием DAG).Я был в состоянии достичь этого вплоть до загрузки в MongoDB.Существуют ли операторы воздушного потока, которые могут это сделать?Если нет, то возможно ли создать собственный код, используя хуки mongoDB, предоставляемые в потоке воздуха?

Спасибо, GT

EDIT 1

Я использовалMongoHook и исходный код BigQueryGetDataOperator (фрагмент кода ниже).Моя проблема сейчас заключается в том, что мне нужно сделать эту работу для 10 ++ миллионов строк, и когда я увеличиваю значение max_results='100' по умолчанию в BigQueryGetDataOperator, я получаю сообщение об ошибке:

sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1")

Я знаю, что мне следует загружать данные в XCom в chunks, но я не уверен, что это действительно возможно.Существует ли стандартный способ анализа больших объемов данных в Xcom?Любая другая альтернатива для достижения этого с помощью Airflow также будет помочь.Единственное, о чем я могу думать, это записать данные в GCS, загрузить в MongoDB и затем удалить файл GCS.

#-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
'''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class BigQueryGetDataOperator(BaseOperator):
    template_fields = ('dataset_id', 'table_id', 'max_results')
    ui_color = '#e4f0e8'

    @apply_defaults
    def __init__(self,
                 dataset_id,
                 table_id,
                 max_results='100',
                 selected_fields=None,
                 bigquery_conn_id='bigquery_default',
                 delegate_to=None,
                 *args,
                 **kwargs):
        super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
        self.dataset_id = dataset_id
        self.table_id = table_id
        self.max_results = max_results
        self.selected_fields = selected_fields
        self.bigquery_conn_id = bigquery_conn_id
        self.delegate_to = delegate_to

    def execute(self, context):
        self.log.info('Fetching Data from:')
        self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
                      self.dataset_id, self.table_id, self.max_results)

        hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                            delegate_to=self.delegate_to)

        conn = hook.get_conn()
        cursor = conn.cursor()
        response = cursor.get_tabledata(dataset_id=self.dataset_id,
                                        table_id=self.table_id,
                                        max_results=self.max_results,
                                        selected_fields=self.selected_fields)

        self.log.info('Total Extracted rows: %s', response['totalRows'])
        rows = response['rows']
        return rows
        # Below lines were commented as I did not want a list but a json
        # table_data = []
        # for dict_row in rows:
        #     single_row = []
        #     for fields in dict_row['f']:
        #         single_row.append(fields['v'])
        #     table_data.append(single_row)

        # return table_data
#----------------------- COPY PASTED SECTION: END ----------------

from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.utils import trigger_rule
from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators import bigquery_to_gcs
from airflow.contrib.operators import bigquery_operator
from airflow.contrib.operators import bigquery_get_data
from airflow.contrib.operators import MongoHook
def get_dlist(**kwargs):
  import logging as log
  #Import pymongo
  from pymongo import MongoClient
  #Pull the data saved in XCom
  value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

  header = ['V1','V2']
  data=[]
  for rows in value:
    onerow={}
    for i,f in zip(range(len(rows['f'])),rows['f']):
      onerow[header[i]] = f['v']
    data.append(onerow)
  log.info("Pulled...")
  log.info(data)
  log.info("Pushing into mongodb...")
  client = MongoClient(localhost:27017)
  db = client.test
  collection = db.testingbq2mongo
  collection.insert(data)
  log.info("Written to mongoDB...")
  client.close()

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date':yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 0,
    #'retry_delay': datetime.timedelta(minutes=5),
    'project_id': 'data-rubrics'
}

try:
  # [START composer_quickstart_schedule]
  with models.DAG(
        'composer_testing00',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_quickstart_schedule]

  data_list = bigquery_get_data.BigQueryGetDataOperator(\
task_id='get_data_in_list_from_bq',\
dataset_id='testcomposer',\ # Name of the dataset which contains the table ( a BQ terminology)
table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
)

  op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
  data_list >> op_push2mongo
except Exception as e:
  raise(e)

EDIT 2

    #-------- COPY PASTED BigQueryGetDataOperator SECTION: START --------------
    '''Source: https://airflow.readthedocs.io/en/stable/_modules/airflow/contrib/operators/bigquery_get_data.html#BigQueryGetDataOperator '''

    from airflow.contrib.hooks.bigquery_hook import BigQueryHook
    from airflow.models import BaseOperator
    from airflow.utils.decorators import apply_defaults


    class BigQueryGetDataOperator(BaseOperator):
        template_fields = ('dataset_id', 'table_id', 'max_results')
        ui_color = '#e4f0e8'

        @apply_defaults
        def __init__(self,
                     dataset_id,
                     table_id,
                     max_results='100',
                     selected_fields=None,
                     bigquery_conn_id='bigquery_default',
                     delegate_to=None,
                     *args,
                     **kwargs):
            super(BigQueryGetDataOperator, self).__init__(*args, **kwargs)
            self.dataset_id = dataset_id
            self.table_id = table_id
            self.max_results = max_results
            self.selected_fields = selected_fields
            self.bigquery_conn_id = bigquery_conn_id
            self.delegate_to = delegate_to

        def execute(self, context):
            self.log.info('Fetching Data from:')
            self.log.info('Dataset: %s ; Table: %s ; Max Results: %s',
                          self.dataset_id, self.table_id, self.max_results)

            hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                                delegate_to=self.delegate_to)

            conn = hook.get_conn()
            cursor = conn.cursor()

    #----------------------- COPY PASTED SECTION: END ----------------

        # Trying to add to a MongoDB here itself - coed by GT
        from pymongo import MongoClient
        header = ['day', 'ticker','app_id','area', 'store_types', 'devices_in_store', 'devices_in_store_or_plot', 'matched_devices', \
'all_devices']
        client = MongoClient('35.237.46.25:27017')
        db = client.test03
        collection = db.advan_t1_sample_mongo00

        response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=0,
                                        table_id=self.table_id,
                                        max_results='2',
                                        selected_fields=self.selected_fields)
        total_rows=int(response['totalRows'])
        chunksize=100000
        for chunk in range(0,total_rows,chunksize):
          rows=[]
          if chunk+chunksize<total_rows:
            self.log.info("Extracting chunk %d to %d"%(chunk,chunk+chunksize))
            response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
                                        table_id=self.table_id,
                                        max_results=str(chunksize),
                                        selected_fields=self.selected_fields)

            rows = response['rows']

            for row in rows:
              onerow={}
              for i,f in zip(range(len(row['f'])),row['f']):
                onerow[header[i]] = f['v']
              collection.insert_one(onerow)
            self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))
          else:
            self.log.info("Extracting chunk %d to %d"%(chunk,total_rows))
            response = cursor.get_tabledata(dataset_id=self.dataset_id, start_index=chunk,
                                        table_id=self.table_id,
                                        max_results=total_rows,
                                        selected_fields=self.selected_fields)

            rows = response['rows']

            for row in rows:
              onerow={}
              for i,f in zip(range(len(row['f'])),row['f']):
                onerow[header[i]] = f['v']
              collection.insert_one(onerow)
            self.log.info("------------------------- Document size: %d --------------------"%(collection.find().count()))


        self.log.info("Pushed into %s"%collection.name)

        if total_rows == collection.find().count():
          self.log.info("Successfully pushed %d records into %s"%(total_rows,collection.name))
          return(1)
        else:
          self.log.warning("Push Failed! Total Rows: %d Document Size: %d"%(total_rows,collection.find().count()))
          return(0)
#        return rows

    from airflow import models
    from airflow.operators.python_operator import PythonOperator
    from airflow.utils import trigger_rule
    from airflow.contrib.operators import gcs_to_bq
    from airflow.contrib.operators import bigquery_to_gcs
    from airflow.contrib.operators import bigquery_operator
    from airflow.contrib.operators import bigquery_get_data
    from airflow.contrib.operators import MongoHook
    def get_dlist(**kwargs):
      import logging as log
      #Import pymongo
      from pymongo import MongoClient
      #Pull the data saved in XCom
      value = kwargs.get('task_instance').xcom_pull(task_ids='get_data_in_list_from_bq')

      header = ['V1','V2']
      data=[]
      for rows in value:
        onerow={}
        for i,f in zip(range(len(rows['f'])),rows['f']):
          onerow[header[i]] = f['v']
        data.append(onerow)
      log.info("Pulled...")
      log.info(data)
      log.info("Pushing into mongodb...")
      client = MongoClient(localhost:27017)
      db = client.test
      collection = db.testingbq2mongo
      collection.insert(data)
      log.info("Written to mongoDB...")
      client.close()

    default_dag_args = {
        # Setting start date as yesterday starts the DAG immediately when it is
        # detected in the Cloud Storage bucket.
        'start_date':yesterday,
        # To email on failure or retry set 'email' arg to your email and enable
        # emailing here.
        'email_on_failure': False,
        'email_on_retry': False,
        # If a task fails, retry it once after waiting at least 5 minutes
        'retries': 0,
        #'retry_delay': datetime.timedelta(minutes=5),
        'project_id': 'data-rubrics'
    }

    try:
      # [START composer_quickstart_schedule]
      with models.DAG(
            'composer_testing00',
            # Continue to run DAG once per day
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
        # [END composer_quickstart_schedule]

      data_list = bigquery_get_data.BigQueryGetDataOperator(\
    task_id='get_data_in_list_from_bq',\
    dataset_id='testcomposer',\ # Name of the dataset which contains the table ( a BQ terminology)
    table_id='summarized_sample_T1' # Name of the BQ table you want to push into MongoDB
    )

      op_push2mongo = PythonOperator(task_id='Push_to_MongoDB', python_callable=get_dlist, provide_context=True)
      data_list >> op_push2mongo
    except Exception as e:
      raise(e)

Ответы [ 2 ]

0 голосов
/ 11 декабря 2018

Считается антипаттерном, чтобы передавать большие объемы данных в XCOM.Я бы порекомендовал записывать данные из BigQuery в долговременную службу хранения, такую ​​как Cloud Storage, а затем загружать их в MongoDB.

0 голосов
/ 21 ноября 2018

Самый простой / быстрый способ - использовать PythonOperator и напрямую обращаться к нужному объекту хука.

Если вам нужно делать это часто, я бы порекомендовал упаковать код как пользовательский оператор.

...