Каков наилучший вариант для создания задания или конвейера для запуска скрипта с использованием pandas_gbq для загрузки таблиц big_query? - PullRequest
0 голосов
/ 01 апреля 2020

Я не инженер данных, и у меня есть некоторые сомнения относительно лучшего подхода. Моя главная цель - сделать так, чтобы (с некоторой периодичностью, например, ежедневно) он поставлял некоторые CSV-файлы (в контейнере на GCP) в таблицы больших запросов.

Вот мой настоящий скрипт:

import pandas as pd
from google.oauth2 import service_account
from pandas.tests.io.test_gbq import pandas_gbq

from src.uploads import files

KEY_JSON = 'key.json'
PROJECT_ID = "<PROJECT_ID>"
SUFFIX_TABLE_NAME = "<TABLE_SUFFIX>."


def parse_to_pd(file, header, numeric_fields):
    df = pd.read_csv(file, skiprows=[1], sep=',', decimal=",", thousands='.')
    df.columns = header
    for col in numeric_fields:
        df[col] = pd.to_numeric(df[col])
    return df


def load_data_to_bq(df, table_id):
    credentials = service_account. \
        Credentials. \
        from_service_account_file(KEY_JSON, )
    pandas_gbq.context.credentials = credentials
    pandas_gbq.context.project = PROJECT_ID
    name_table = table_id.replace(SUFFIX_TABLE_NAME)

    pandas_gbq.to_gbq(df, name_table, if_exists="replace")


if __name__ == "__main__":
    for table_id, config in files.items():
        load_data_to_bq(
            parse_to_pd(config.get('file_name'),
                        config.get('fields'),
                        config.get('numeric_fields'),
                        config.get('date_fields')
                        ), table_id)

Этот сценарий сработал, но я хочу запустить на GCP какой-нибудь сервис в облаке. У вас есть предложения?

1 Ответ

1 голос
/ 03 апреля 2020

Итак, я решил не использовать DataFlow и Apache Beam, потому что мои файлы были не такими большими. Так что я только что запланировал работу в crontab. Вот класс, который я создал для обработки:

import os
import shutil

import pandas as pd
import uuid

from google.cloud import storage


class DataTransformation:
    """A helper class which contains the logic to translate a csv into a
    format BigQuery will accept.
    """

    def __init__(self, schema, bucket_name, credentials):
        """ Here we read the input schema and which file will be transformed. This is used to specify the types
        of data to create a pandas dataframe.
        """
        self.schema = schema
        self.files = []
        self.blob_files = []
        self.client = storage.Client(credentials=credentials, project="stewardship")
        self.bucket = self.client.get_bucket(bucket_name)

    def parse_method(self, csv_file):
        """This method translates csv_file in a pandas dataframe which can be loaded into BigQuery.
        Args:
            csv_file: some.csv

        Returns:
            A pandas dataframe.
        """
        df = pd.read_csv(csv_file,
                         skiprows=[1],
                         sep=self.schema.get('sep'),
                         decimal=self.schema.get('decimal'),
                         thousands=self.schema.get('thousands')
                         )
        df.columns = self.schema.get('fields')
        for col in self.schema.get('numeric_fields'):
            df[col] = pd.to_numeric(df[col])
        shutil.move(csv_file, "./temp/processed/{0}".format(
            os.path.splitext(os.path.basename(csv_file))[0])
                    )
        return df

    def process_files(self):
        """This method process all files and concat to a unique dataframe
        Returns:
            A pandas dataframe contained.
        """
        frames = []
        for file in self.files:
            frames.append(self.parse_method(file))
        if frames:
            return pd.concat(frames)
        else:
            return pd.DataFrame([], columns=['a'])

    def download_blob(self):
        """Downloads a blob from the bucket."""
        for blob_file in self.bucket.list_blobs(prefix="input"):
            if self.schema.get("file_name") in blob_file.name:
                unique_filename = "{0}_{1}".format(self.schema.get("file_name"), str(uuid.uuid4()))
                destination_file = os.path.join("./temp/input", unique_filename + ".csv")
                blob_file.download_to_filename(
                    destination_file
                )
                self.files.append(destination_file)
                self.blob_files.append(blob_file)
        return True if len(self.blob_files) > 0 else False

    def upload_blob(self, destination_blob_name):
        """Uploads a file to the bucket."""
        blob = self.bucket.blob(destination_blob_name)
        blob.upload_from_filename(os.path.splitext(os.path.basename(destination_blob_name))[0] +
                                  os.path.splitext(os.path.basename(destination_blob_name))[1])

    def move_processed_files(self):
        """Move processed files to processed folder"""
        for blob_file in self.blob_files:
            self.bucket.rename_blob(blob_file, "processed/" + blob_file.name)
        return [b.name for b in self.blob_files]

Так что в основном я использовал pandas_gbq для обработки всего:

import logging

from google.oauth2 import service_account
from pandas.tests.io.test_gbq import pandas_gbq

from src.data_transformation import DataTransformation
from src.schemas import schema_files_csv

KEY_JSON = 'KEY.json'
PROJECT_ID = "<PROJECT_NAME>"
SUFFIX_TABLE_NAME = "<TABLE_SUFFIX>"
BUCKET_NAME = "BUCKET_NAME"


def run():
    credentials = service_account. \
        Credentials. \
        from_service_account_file(KEY_JSON, )
    # DataTransformation is a class we built in this script to hold the logic for
    # transforming the file into a BigQuery table.
    for table, schema in schema_files_csv.items():
        try:
            logging.info("Processing schema for {}".format(schema.get("file_name")))

            data_ingestion = DataTransformation(schema, BUCKET_NAME, credentials)

            if not data_ingestion.download_blob():
                logging.info(" 0 files to process")
                continue
            logging.info("Downloaded files: {}".format(",".join(data_ingestion.files) or "0 files"))

            frame = data_ingestion.process_files()

            logging.info("Dataframe created with some {} lines".format(str(frame.shape)))

            if not frame.empty:
                pandas_gbq.context.project, pandas_gbq.context.credentials = (PROJECT_ID, credentials)

                pandas_gbq.to_gbq(frame,
                                  table.replace(SUFFIX_TABLE_NAME, ""),
                                  if_exists="replace"
                                  )
                logging.info("Table {} was loaded on Big Query".format(table.replace(SUFFIX_TABLE_NAME, "")))

                blob_files = data_ingestion.move_processed_files()
                logging.info("Moving files {} to processed folder".format(",".join(blob_files)))

            data_ingestion.upload_blob("info.log")

        except ValueError as err:
            logging.error("csv schema expected are wrong, please ask to Andre Araujo update the schema. "
                          "Error: {}".format(err.__str__()))


if __name__ == "__main__":
    logging.basicConfig(filename='info.log', level=logging.INFO)
    run()

Для обработки использования схемы в dict / JSON вот так:

{
    "<PROJECT>.<DATASET>.<TABLE_NAME>": {
        "file_name": "<NAME_OF_FILE>",
        "fields": [
            "Project",
            "Assignment_Name",
            "Request_Id",
            "Resource_Grade",
            "Resource_Role",
            "Record_ID",
            "Assignment_ID",
            "Resource_Request_Account_Id",
        ],
        "numeric_fields": [],
        "sep": ";",
        "decimal": ".",
        "thousands": ","
    },
.... other schema
}
...