Загрузка данных CSV в облачное хранилище с использованием разделителя, отличного от запятой - PullRequest
0 голосов
/ 21 мая 2019

Мне нужно знать, как я могу через python читать данные CSV из облачного хранилища в большой запрос, но без использования запятой в качестве разделителя. В идеале я должен распознавать разделитель автоматически, или я могу объявить переменную, которая будет отделять поля от таблицы. Запятая в моем процессе как разделитель - проблема.

После многих попыток, используя ";" как разделитель, я заметил, что мой процесс приема не распознает символ как разделитель.

#Libraries
import csv 
import pyodbc
import logging
import os
import cloudstorage as gcs
import gcloud
from gcloud import storage
from google.cloud import bigquery
import pandas as pd

db = pyodbc.connect("DRIVER={SQL Server};server=10.0.1.1;database=blabla;uid=test;pwd=xxx")
cursor = db.cursor()
SQLview = 'select * from test'
cursor.execute(SQLview)
with open('test_google2.csv', 'w', newline= '') as f:
    writer = csv.writer(f, delimiter=',')
    writer.writerow([ i[0] for i in cursor.description ])
    writer.writerows(cursor.fetchall())    

from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import os
from google.cloud import bigquery
import json


try:
    script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
    script_path = "C:\\Users\\user1\\auth.json"

#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path 

client = bigquery.Client(project='big-data')
dataset_id = 'dataset_tst'
dataset_ref = client.dataset('dataset_tst')    

credentials = GoogleCredentials.get_application_default()
service = discovery.build('storage', 'v1', credentials=credentials)

filename = 'C:\\Users\\user1\\Documents\\test_google2.csv'
bucket = 'big-data-bkt'

body = {'name': 'test_google2.csv'}
req = service.objects().insert(bucket=bucket, body=body, media_body=filename)
resp = req.execute()


#CLOUD STORAGE >>> BIG QUERY
from gcloud import storage
from google.cloud import bigquery
from gcloud import bigquery as bq1

bucket_uri = 'bucket_id'
bucket_name = 'bucket_name'
bucket_target = 'test_google2.csv'
local_dataset = 'test_google2.csv'
bucket_target_uri = bucket_uri + bucket_target
bigquery_dataset = 'dataset_tst'
bigquery_table = 'test'

client1 = bq1.Client(project='big-data')
dataset_ref1 = client1.dataset(bigquery_dataset)
dataset_ref1.create()  # API request

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Upload a CSV to Google Cloud Storage.

    1. Retrieve the target bucket.
    2. Set destination of data to be uploaded.
    3. Upload local CSV.
    """
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    # Commence Upload
    blob.upload_from_filename(source_file_name)
    print('File {} uploaded to {}.'.format(
        source_file_name,
        destination_blob_name))


def insert_bigquery(target_uri, dataset_id, table_id):
    """Insert CSV from Google Storage to BigQuery Table.

    1. Specify target dataset within BigQuery.
    2. Create a Job configuration.
    3. Specify that we are autodetecting datatypes.
    4. Reserve row #1 for headers.
    5. Specify the source format of the file (defaults to CSV).
    6. Pass the URI of the data storage on Google Cloud Storage from.
    7. Load BigQuery Job.
    8. Execute BigQuery Job.
    """
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(uri,dataset_ref.table(table_id), job_config=job_config)  # API request
    print('Starting job {}'.format(load_job.job_id))
    # Waits for table load to complete.
    load_job.result()
    print('Job finished.')


upload_blob(bucket_name, local_dataset, bucket_target)
insert_bigquery(bucket_target_uri, bigquery_dataset, bigquery_table)

1 Ответ

0 голосов
/ 21 мая 2019

Хорошо, я сам потратил время на то, чтобы воспроизвести эту проблему, чтобы дать вам правильный ответ, и вот что я нашел.

Если вы хотите установить другой разделитель по умолчанию "," при загрузке данных в BigQuery, вам нужно указать это в вашем job_config:

job_config.field_delimiter = ";"

Вот и все. Вы можете узнать больше об опциях и различных разделителях, которые вы можете использовать, читая здесь, в документации .

Мой последний код такой (поскольку я использую Cloud Console Shell, я пропускаю несколько конфигураций учетных данных).

# Libraries
import csv 
import logging
import os
# import cloudstorage as gcs
import gcloud
from gcloud import storage
from google.cloud import bigquery
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json
import mysql.connector

# connecting to the DB 
cnx = mysql.connector.connect(user="user", password="pass", host="11.111.111.11", database="test")
cursor = cnx.cursor()
SQLview = 'select * from test'
filename = 'test_google2.csv'
folder = "folder_path_to_file"

# Creating CVS file
cursor.execute(SQLview)
with open(filename, 'w', newline= '') as f:
    writer = csv.writer(f, delimiter=';')
    writer.writerow([ i[0] for i in cursor.description ])
    writer.writerows(cursor.fetchall())    


# uploading it into a bucket
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client(project="project_name")
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print('File {} uploaded to {}'.format(
        source_file_name,
        destination_blob_name
    ))


# inserting the csv from Cloud Storage into BigQuery
def insert_bigquery(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client(project="project_name")
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.skip_leading_rows = 1
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.field_delimiter = ";"
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job {}'.format(load_job.job_id))
    load_job.result()
    print('Job finished.')



upload_blob("bucket_name", folder + filename, filename)
insert_bigquery("gs://bucket_name/"+filename, "dataset_id", "table_id")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...