Переработка Python для l oop в рабочий процесс ETL - Использование Luigi, Airflow и т. Д. - PullRequest
1 голос
/ 13 марта 2020

В настоящее время я экспериментирую с различными python методами рабочего процесса, и у меня есть вложенное значение для l oop, которое я хочу преобразовать в автоматизированный рабочий процесс. Я пытался использовать luigi, но я не могу определить успешный рабочий процесс, который принимает два набора данных, которые зависят друг от друга, и выводит оба блока данных CSV. Каждый luigi пример, который я видел до сих пор, собирает данные за один шаг, агрегирует, а затем записывает результат.

В моем примере я хочу получить ежедневное табло NBA из API, store что в CSV; затем используя эти данные табло, чтобы ввести статистику (или boxscore) каждой из игр за этот день и сохранить их в отдельных CSV. хочу следующее:

import re
import requests
import pandas as pd
from pandas.io.json import json_normalize
import os

if not os.path.exists('data/'):
    os.makedirs('data/')
if not os.path.exists('data/games/'):
    os.makedirs('data/games/')
if not os.path.exists('data/boxscores/'):
    os.makedirs('data/boxscores/')

dates = ['2020-03-01', '2020-03-02'] # and so on...
for date in dates:
    print('*** DATE: {}'.format(date))
    date = re.sub( '-', '', date)
    print('*** MODIFIED DATE: {}'.format(date))
    response = requests.get('http://data.nba.net/json/cms/noseason/scoreboard/{}/games.json'.format(date))
    df = pd.read_json(response.text)
    games_df = json_normalize(df[df.index == 'games']['sports_content'][0]['game'])
    games_df.to_csv('data/games/games_{}.csv'.format(date), index=False)
    for id in games_df.id.tolist():
        print('*** GAME ID: {}'.format(id))
        response = requests.get("http://data.nba.net/10s/prod/v1/{0}/{1}_boxscore.json".format(date, id))
        df = pd.read_json(response.text)
        df = json_normalize(df[df.index == 'activePlayers']['stats'][0])
        boxscore = df[['personId', 'firstName', 'lastName', 'teamId',
               'min', 'points', 'fgm', 'fga',
               'ftm', 'fta', 'tpm', 'tpa', 'offReb', 'defReb',
               'totReb', 'assists', 'pFouls', 'steals', 'turnovers', 'blocks', 'plusMinus']]
        boxscore['min'] = boxscore['min'].str.split(":", expand = True)[0]
        boxscore.to_csv('data/boxscores/boxscore_{}.csv'.format(id), index=False) 

Я хочу не использовать циклы for, и мне нравится, как фреймворк luigi позволит избежать повторного создания дней, которые вы уже построили. У кого-нибудь есть какие-либо предложения или ссылки о том, как построить такой конвейер? Я открыт для переключения с luigi на Воздушный поток, если он более интуитивно понятен.

...