Большая таблица запросов создает и загружает данные через Python - PullRequest
0 голосов
/ 28 мая 2018

Я пытаюсь извлечь события из MixPanel, обработать его и затем загрузить в таблицу BigQuery (создание новой таблицы).

Я погуглил все доступные ресурсы, но не помог в решении проблемы.

Below is my code,
# Required modules import
import os
from mixpanel_api import Mixpanel
import collections
import json
from google.cloud import storage, bigquery
# Function to flatten exported file
def flatten(d, parent_key='', sep=''):
    items = []
    for k, v in d.items():
        new_key = parent_key.replace("PROPERTIES","").replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'","") + sep + k.replace(" ","").replace("_","").replace("$","").replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'","") if parent_key else k
        #new_key = parent_key.replace("PROPERTIES","").join(e for e in parent_key if e.isalnum()) + sep + k.join(e for e in k if e.isalnum()) if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key.upper(), sep=sep).items())
        else:
            items.append((new_key.upper().replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'",""), v))
            #items.append((new_key.upper().join(e for e in new_key if e.isalnum()), v))
            #items.append(("ID","1"))
            #items.append(("PROCESS_DATE",""))
            #items.append(("DATA_DATE",""))
    return dict(items)
# Start of execution point
if __name__ == '__main__':

   # Secret and token to access API
   api_sec = 'aa8af6b5ca5a5ed30e20f3af0acdfb2d'
   api_tok = 'ad5234953e64b908bcd35388875324db'
   # User input for date range and filename
   start_date = str(input('Enter the start date(format: YYYY-MM-DD): '))
   end_date = str(input('Enter the end date(format: YYYY-MM-DD): '))
   file_name = str(input('Enter filename to store output: '))
   file_formatter = str(input('Enter filename to store formatted output: '))
   # Instantiating Mixpanel object
   mpo = Mixpanel(api_sec,
           api_tok
         )
   # Exporting events for the specified date range and storing in the filename provided, gunzip'ed file
   mpo.export_events(file_name,
           {'from_date':start_date,
            'to_date':end_date
           },
           add_gzip_header=False,
           raw_stream=True
        )

   # Dict for schema derived from file
   schema_dict = {}
   # Flatten file and write-out to another file
   with open(file_name, 'r') as uf, open(file_formatter, 'a') as ff, open('schema_file', 'a') as sf:
       #schema_list = []
       for line in uf:
           temp = flatten(json.loads(line))
           for k in temp.keys():
              if k not in schema_dict:
                   schema_dict[k] = "STRING"
                   #schema_list.append({"name" : k, "type" : "STRING"})
            #ff.write(json.dumps(temp))
           json.dump(temp, ff, indent = None, sort_keys = True)                # Dumps each dictionary entry as a newline entry, even '{' '}' is on new lines
           ff.write('\n')                     # Adds a new line after each object dump to file
       #json.dump(schema_dict, sf, indent = None, sort_keys = True)
       #json.dump(schema_list, sf, indent = None, sort_keys = True)

   # Removing source file
   if os.path.isfile(file_name):
       sfr = os.remove(file_name)
       if sfr == None:
           print 'File ' +file_name+ ' removed from local storage'
       else:
           print 'File ' +file_name+ ' remove failed from local storage'
   # Uploading file to Google bucket
   client = storage.Client()
   bucket = client.get_bucket('yathin-sample-bucket')
   blob = bucket.blob(file_formatter)
   status = blob.upload_from_filename(file_formatter)
   if status == None:
       print 'File ' +file_formatter+ ' upload success. Removing local copy.'
       fr = os.remove(file_formatter)
       if fr == None:
           print 'File ' +file_formatter+ ' removed from local storage'
       else:
           print 'File ' +file_formatter+ ' remove failed from local storage'
   # Loading file to BigQuery
   client = bigquery.Client()
   dataset_id = 'sample_dataset'
   dataset_ref = client.dataset(dataset_id)
   job_config = bigquery.LoadJobConfig()
   job_config.schema = [ bigquery.SchemaField(k,v) for k,v in schema_dict.items() ]
   #job_config.autodetect = True
   #job_config.create_dsiposition = 'CREATE_IF_NEEDED'
   #job_config.write_disposition = 'WRITE_APPEND'
   job_config.source_format = 'NEWLINE_DELIMITED_JSON'
   uri = 'gs://yathin-sample-bucket/'+file_formatter
   load_job = client.load_table_from_uri(
              uri,
              dataset_ref.table('test_json'),
              job_config=job_config)  # API request
   #assert load_job.job_type == 'load'
   #load_job.result()  # Waits for table load to complete.

Этот код не возвращает никаких ошибок, но таблица не создается.

Может кто-нибудь помочь, указав, что не так.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...