Добавить поле загрузки даты при загрузке CSV в большой запрос - PullRequest
0 голосов
/ 21 ноября 2018

Использование Python.Есть ли способ добавить дополнительное поле при обработке файла CSV в Big Query.Я хотел бы добавить поле date_loaded с текущей датой?

Пример кода Google, который я использовал ..

# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'

dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
    bigquery.SchemaField('name', 'STRING'),
    bigquery.SchemaField('post_abbr', 'STRING')
]
job_config.skip_leading_rows = 1    
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
uri = 'gs://cloud-samples-data/bigquery/us-states/us-states.csv'
    load_job = client.load_table_from_uri(
    uri,
    dataset_ref.table('us_states'),
    job_config=job_config)  # API request
print('Starting job {}'.format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print('Job finished.')

destination_table = client.get_table(dataset_ref.table('us_states'))
print('Loaded {} rows.'.format(destination_table.num_rows))

Ответы [ 2 ]

0 голосов
/ 09 февраля 2019

Вы можете продолжать загружать данные по мере загрузки, но в таблицу с именем old_table.

После загрузки вы можете запустить что-то вроде:

bq --location=US query --destination_table mydataset.newtable --use_legacy_sql=false --replace=true 'select *, current_date() as date_loaded from mydataset.old_table'

Это в основном загружает содержимое старой таблицы с новым столбцом date_loaded в конце к new_table.Таким образом, теперь у вас есть новый столбец без локальной загрузки или всего беспорядка.

0 голосов
/ 09 февраля 2019

Изменяя этот пример Python , чтобы он соответствовал вашей проблеме, вы открываете и читаете оригинальный CSV-файл с моего локального ПК, редактируете его, добавляя столбец и добавляя метки времени в конце каждой строки, чтобы избежать появленияпустой столбец Эта ссылка объясняет, как получить метку времени в Python с пользовательской датой и временем.

Затем вы записываете полученные данные в выходной файл и загружаете их в Google Storage. Здесь вы можете найти информацию о том, как запускать внешние команды из файла Python.

Надеюсь, это поможет.

#Import the dependencies
import csv,datetime,subprocess
from google.cloud import bigquery

#Replace the values for variables with the appropriate ones
#Name of the input csv file
csv_in_name = 'us-states.csv'
#Name of the output csv file to avoid messing up the original
csv_out_name = 'out_file_us-states.csv'
#Name of the NEW COLUMN NAME to be added
new_col_name = 'date_loaded'
#Type of the new column
col_type = 'DATETIME'
#Name of your bucket
bucket_id = 'YOUR BUCKET ID'
#Your dataset name
ds_id = 'YOUR DATASET ID'
#The destination table name
destination_table_name = 'TABLE NAME'


# read and write csv files
with open(csv_in_name,'r') as r_csvfile:
    with open(csv_out_name,'w') as w_csvfile:

        dict_reader = csv.DictReader(r_csvfile,delimiter=',')
        #add new column with existing
        fieldnames = dict_reader.fieldnames + [new_col_name]
        writer_csv = csv.DictWriter(w_csvfile,fieldnames,delimiter=',')
        writer_csv.writeheader()


        for row in dict_reader:
#Put the timestamp after the last comma so that the column is not empty
            row[new_col_name] = datetime.datetime.now()
            writer_csv.writerow(row)

#Copy the file to your Google Storage bucket
subprocess.call('gsutil cp ' + csv_out_name + ' gs://' + bucket_id , shell=True)


client = bigquery.Client()

dataset_ref = client.dataset(ds_id)
job_config = bigquery.LoadJobConfig()
#Add a new column to the schema!
job_config.schema = [
    bigquery.SchemaField('name', 'STRING'),
    bigquery.SchemaField('post_abbr', 'STRING'),
    bigquery.SchemaField(new_col_name, col_type)
]
job_config.skip_leading_rows = 1
# The source format defaults to CSV, so the line below is optional.
job_config.source_format = bigquery.SourceFormat.CSV
#Address string of the output csv file
uri = 'gs://' + bucket_id + '/' + csv_out_name
load_job = client.load_table_from_uri(uri,dataset_ref.table(destination_table_name),job_config=job_config)  # API request
print('Starting job {}'.format(load_job.job_id))

load_job.result()  # Waits for table load to complete.
print('Job finished.')

destination_table = client.get_table(dataset_ref.table(destination_table_name))
print('Loaded {} rows.'.format(destination_table.num_rows))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...