Я пытаюсь проверить Apache Airflow с базовой функцией 3. И назначая их на 3 задачи в одном порядке. Но есть проблема в том, что:
Когда я пытаюсь загрузить данные в POstgresql db, выдается ошибка, что NoneType object with no attribute cursor
Есть предложения, как это решить?
Код:
def fetch_data():
global df
#Fetch data from url
url = 'http://api.open-notify.org/astros.json'
response = requests.get(url)
logging.info(response.status_code)
json_data = response.json()
logging.info(json_data)
#Normalize it
df = json_normalize(json_data['people'])
return df
def connect_to_db():
global engine
# Connect to database (Note: The package psychopg2 is required for Postgres to work with SQLAlchemy)
engine = create_engine("postgresql://postgres:postgres@localhost/postgres")
logging.info('Connected to the database')
# Verify that there are no existing tables
print(engine.table_names())
return engine
def load_data():
engine = connect_to_db()
df = fetch_data()
table_name = 'astro'
logging.info('Table is created')
df.to_sql(table_name, engine)
logging.info('Data is loaded.')
Трассировка:
ERROR - 'NoneType' object has no attribute 'cursor'
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/example_dags/sample_task.py", line 65, in load_data
df.to_sql(name=table_name, con = engine, if_exists='append')
File "/usr/local/lib/python2.7/dist-packages/pandas/core/generic.py", line 2531, in to_sql
dtype=dtype, method=method)
File "/usr/local/lib/python2.7/dist-packages/pandas/io/sql.py", line 460, in to_sql
chunksize=chunksize, dtype=dtype, method=method)
File "/usr/local/lib/python2.7/dist-packages/pandas/io/sql.py", line 1546, in to_sql
table.create()
File "/usr/local/lib/python2.7/dist-packages/pandas/io/sql.py", line 572, in create
if self.exists():
File "/usr/local/lib/python2.7/dist-packages/pandas/io/sql.py", line 560, in exists
return self.pd_sql.has_table(self.name, self.schema)
File "/usr/local/lib/python2.7/dist-packages/pandas/io/sql.py", line 1558, in has_table
return len(self.execute(query, [name, ]).fetchall()) > 0
File "/usr/local/lib/python2.7/dist-packages/pandas/io/sql.py", line 1426, in execute
cur = self.con.cursor()
AttributeError: 'NoneType' object has no attribute 'cursor'