Я новичок ie в python и сельдерее, и у меня возникают проблемы с импортом, ниже представлена структура моего проекта:
project/
app/
starwars/
star_wars.py
utils/
flask_Celery.py
tasks.py
__init__.py
Это то, что есть в моем __init__.py
файле:
from app.starwars.star_wars import StarWars
from app.utils.flask_celery import make_celery
app = Flask(__name__)
# configure celery
app.config.update(
CELERY_BROKER_URL='redis://redis:6379/0',
CELERY_RESULT_BACKEND='redis://redis:6379/0'
)
celery = make_celery(app)
# secret key
app.secret_key = "xxxxxx"
api = Api(app)
# Cross Origin Resource Sharing
CORS(app)
api.add_resource(StarWars, '/api/v1/starwars')
Тогда, когда я получаю к нему доступ внутри star_wars.py
, вот где я получаю ошибку импорта, что я пропускаю? ниже мой код:
from ..utils.tasks import paginate_requested_data
class StarWars(Resource):
def get(self):
r = redis.StrictRedis(host='redis', port=6379, db=0)
res = paginate_requested_data.delay()
return {'hello': '{}'.format(res)}
Tasks.py
, это код ниже:
from celery.worker.state import requests
from app import celery
@celery.task(name='starwars_paginate')
def paginate_requested_data():
results = []
pagination = 1
url = 'https://swapi.co/api/starships/'
params = {'page': pagination}
r = requests.get(url, params=params)
data = r.json()
for i in data['results']:
dict_res = {'name': i['name'], 'hyperdrive_rating': i['hyperdrive_rating']}
results.append(dict_res)
while r.status_code == 200:
try:
pagination += 1
params['page'] = pagination
r = requests.get(url, params=params)
data = r.json()
for i in data['results']:
dict_res = {'name': i['name'], 'hyperdrive_rating': i['hyperdrive_rating']}
results.append(dict_res)
except KeyError:
print('stop')
return results
@celery.task(name='starwars_test')
def add_together(a, b):
return a + b
Это ошибка получаю:
> Traceback (most recent call last): File
> "C:/Users/Huxy/PycharmProjects/starwars_challenge/app.py", line 1, in
> <module>
> from app import * File "C:\Users\Huxy\PycharmProjects\starwars_challenge\app\__init__.py",
> line 7, in <module>
> from app.starwars.star_wars import StarWars File "C:\Users\Huxy\PycharmProjects\starwars_challenge\app\starwars\star_wars.py",
> line 8, in <module>
> from ..utils.tasks import paginate_requested_data File "C:\Users\Huxy\PycharmProjects\starwars_challenge\app\utils\tasks.py",
> line 3, in <module>
> from app import celery ImportError: cannot import name 'celery' from 'app'
> (C:\Users\Huxy\PycharmProjects\starwars_challenge\app\__init__.py)