Как я могу массово загружать данные в таблицу bigquery из pandas фрейма данных в datalab - PullRequest
0 голосов
/ 17 июня 2020

Я пытаюсь вставить новые данные в таблицу, уже созданную в bigquery. После обработки данных и создания фрейма данных я хочу добавить новые строки в таблицу. Я ищу эффективный способ сделать это.

Сейчас я делаю

def export_items_to_bigquery(ID_recipien,action_date_hour,monday, tuesday, wednesday, thursday, friday, saturday, sunday, click_action, read_action, converted,imedeate_response, weight, weight_mu, processing_date):
# Instantiates a client
  bigquery_client = bigquery.Client()
# Prepares a reference to the dataset
  dataset_ref = bigquery_client.dataset('internal')
  table_ref = dataset_ref.table('Processed_data')
  table = bigquery_client.get_table(table_ref)  # API call
  rows_to_insert = [(ID_recipien,action_date_hour,monday, tuesday, wednesday, thursday, friday, saturday, sunday, click_action, read_action, converted,imedeate_response, weight, weight_mu, processing_dat)]
  errors = bigquery_client.insert_rows(table, rows_to_insert)  # API request
#assert errors == []
  print(errors)

for i in range(len(out)): 
    hrs=int(out.iloc[i].hours)
    ID_recipient=int(out.iloc[i].ID_recipient)
    Last_Date=str(out.iloc[i].Last_Date)
    last_id_send=int(out.iloc[i].last_id_send)
    unique_send_id_action=int(out.iloc[i].unique_send_id_action)
    click_count=int(out.iloc[i].click_count)
    read_count=int(out.iloc[i].read_count)
    Monday=int(out.iloc[i].Monday)
    Tuesday=int(out.iloc[i].Tuesday)
    Wednesday=int(out.iloc[i].Wednesday)
    Thursday=int(out.iloc[i].Thursday)
    Friday=int(out.iloc[i].Friday)
    Saturday=int(out.iloc[i].Saturday)
    Sunday=int(out.iloc[i].Sunday)
    Total_day_active=int(out.iloc[i].Total_day_active)
    click_read_mulrtiplier=int(out.iloc[i].click_read_mulrtiplier)
    click_read_weight=int(out.iloc[i].click_read_weight)
    final_weight=int(out.iloc[i].final_weight)
    last_hour=int(out.iloc[i].last_hour)
    last_email=int(out.iloc[i].last_email)
    last_log_id=int(out.iloc[i].last_log_id)
    export_items_to_bigquery(hrs,ID_recipient,Last_Date, last_id_send, unique_send_id_action,click_count, read_count, Monday, Tuesday, Wednesday, Thursday, Friday, Saturday, Sunday, Total_day_active, click_read_mulrtiplier, click_read_weight, final_weight, last_hour, last_email,last_log_id)

Это очень неэффективно

Я также пробовал

new_data.to_gbq('internal.intermediate_table_weight_calc', 'ids-internal', chunksize=10000, if_exists='append')

но дает. Убедитесь, что структура и типы данных в DataFrame соответствуют схеме целевой таблицы, даже если все имена совпадают, а все dtype - int.

1 Ответ

1 голос
/ 19 июня 2020
t1=dt.datetime.now()
from google.cloud import bigquery
import os, json
def format_schema(schema):
    formatted_schema = []
    for row in schema:
        formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))   
    return formatted_schema

#json_data = new_data.head(4).to_json(orient = 'records')

### Additional parameter used to convert to newline delimited format
json_data = new_data.head(4).to_json(orient = 'records')
json_object = json.loads(json_data)
project_id = 'mailkit-internal'
dataset_id = 'internal'
table_id = 'intermediate_table_weight_calc'

client = bigquery.Client(project = project_id)
dataset = client.dataset(dataset_id)
table = dataset.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
#job_config.schema = format_schema(table_schema)
job = client.load_table_from_dataframe(df, table, job_config = job_config)
print(job.result())

это сработало .. надеюсь, что это поможет .. убедитесь, что установили! Pip install pyarrow! Pip install fastparquet, а имя и тип таблицы соответствуют dataframe и таблице bigquery

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...