Я использую Flask с Celery и пытаюсь заблокировать определенную задачу, чтобы она могла выполняться только по одной за раз.В документации по сельдерею приведен пример выполнения этой документации по сельдерею. Обеспечение того, чтобы задача выполнялась только по одному за раз .Этот пример был дан для Django, однако я использую флешку. Я приложил все усилия, чтобы преобразовать это для работы с Flask, однако я все еще вижу myTask1, блокировка которого может быть запущена несколько раз.
Одна вещь, котораяМне неясно, правильно ли я использую кеш, я никогда не использовал его раньше, поэтому для меня все это ново.Одна вещь из документа, которая упоминается, но не объясняется, это:
Примечания к документу:
In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.
Я не совсем уверен, что это значит, должен ли я использоватькэшировать вместе с базой данных, и если да, то как мне это сделать?Я использую mongodb.В моем коде у меня просто есть эта настройка для кэша cache = Cache(app, config={'CACHE_TYPE': 'simple'})
, как это было упомянуто в документах Flask-Cache Flask-Cache Docs
Еще одна вещь, которая мне не яснаесли есть что-то другое, что мне нужно сделать, так как я звоню своему myTask1
из моего маршрута Flask task1
Вот пример моего кода, который я использую.
from flask import (Flask, render_template, flash, redirect,
url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
app = Flask(__name__)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'
######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'
mongo = PyMongo(app)
##############################
# CELERY ARGUMENTS
##############################
app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
"host": "localhost",
"port": 27017,
"database": "celery-test-db",
"taskmeta_collection": "celery_jobs",
}
app.config['CELERY_TASK_SERIALIZER'] = 'json'
celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@celery.task(bind=True, name='app.myTask1')
def myTask1(self):
self.update_state(state='IN TASK')
lock_id = self.name
with memcache_lock(lock_id, self.app.oid) as acquired:
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
self.update_state(state='DOING WORK')
time.sleep(90)
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
print('you are in task2')
self.update_state(state='STARTING')
time.sleep(120)
print('task2 done')
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/task1', methods=['GET', 'POST'])
def task1():
print('running task1')
result = myTask1.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})
return render_template('task1.html')
@app.route('/task2', methods=['GET', 'POST'])
def task2():
print('running task2')
result = myTask2.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})
return render_template('task2.html')
@app.route('/status', methods=['GET', 'POST'])
def status():
taskid_list = []
task_state_list = []
TaskName_list = []
allAsyncData = mongo.db.job_task_id.find()
for doc in allAsyncData:
try:
taskid_list.append(doc['taskid'])
except:
print('error with db conneciton in asyncJobStatus')
TaskName_list.append(doc['TaskName'])
# PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
for item in taskid_list:
try:
task_state_list.append(myTask1.AsyncResult(item).state)
except:
task_state_list.append('UNKNOWN')
return render_template('status.html', data_list=zip(task_state_list, TaskName_list))
Окончательный рабочий код
from flask import (Flask, render_template, flash, redirect,
url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
import redis
from flask_redis import FlaskRedis
app = Flask(__name__)
# ADDING REDIS
redis_store = FlaskRedis(app)
# POINTING CACHE_TYPE TO REDIS
cache = Cache(app, config={'CACHE_TYPE': 'redis'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'
######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'
mongo = PyMongo(app)
##############################
# CELERY ARGUMENTS
##############################
# CELERY USING REDIS
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
"host": "localhost",
"port": 27017,
"database": "celery-test-db",
"taskmeta_collection": "celery_jobs",
}
app.config['CELERY_TASK_SERIALIZER'] = 'json'
celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)
LOCK_EXPIRE = 60 * 2 # Lock expires in 2 minutes
@contextmanager
def memcache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
print('in memcache_lock and timeout_at is {}'.format(timeout_at))
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
print('memcache_lock and status is {}'.format(status))
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)
@celery.task(bind=True, name='app.myTask1')
def myTask1(self):
self.update_state(state='IN TASK')
print('dir is {} '.format(dir(self)))
lock_id = self.name
print('lock_id is {}'.format(lock_id))
with memcache_lock(lock_id, self.app.oid) as acquired:
print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
if acquired:
# do work if we got the lock
print('acquired is {}'.format(acquired))
self.update_state(state='DOING WORK')
time.sleep(90)
return 'result'
# otherwise, the lock was already in use
raise self.retry(countdown=60) # redeliver message to the queue, so the work can be done later
@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
print('you are in task2')
self.update_state(state='STARTING')
time.sleep(120)
print('task2 done')
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/task1', methods=['GET', 'POST'])
def task1():
print('running task1')
result = myTask1.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'myTask1'})
return render_template('task1.html')
@app.route('/task2', methods=['GET', 'POST'])
def task2():
print('running task2')
result = myTask2.delay()
# get async task id
taskResult = AsyncResult(result.task_id)
# push async taskid into db collection job_task_id
mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})
return render_template('task2.html')
@app.route('/status', methods=['GET', 'POST'])
def status():
taskid_list = []
task_state_list = []
TaskName_list = []
allAsyncData = mongo.db.job_task_id.find()
for doc in allAsyncData:
try:
taskid_list.append(doc['taskid'])
except:
print('error with db conneciton in asyncJobStatus')
TaskName_list.append(doc['TaskName'])
# PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
for item in taskid_list:
try:
task_state_list.append(myTask1.AsyncResult(item).state)
except:
task_state_list.append('UNKNOWN')
return render_template('status.html', data_list=zip(task_state_list, TaskName_list))
if __name__ == '__main__':
app.secret_key = 'super secret key for me123456789987654321'
app.run(port=1234, host='localhost')
Здесь также показан снимок экрана, на котором вы видите, что я запускал myTask1
два раза и myTask2 один раз.Теперь у меня ожидаемое поведение для myTask1.Теперь myTask1
будет выполняться одним рабочим, если другой работник попытается его забрать, он будет просто повторять попытки, основываясь на том, что я определил.