Блокировка задачи сельдерея колбу - PullRequest
0 голосов
/ 27 декабря 2018

Я использую Flask с Celery и пытаюсь заблокировать определенную задачу, чтобы она могла выполняться только по одной за раз.В документации по сельдерею приведен пример выполнения этой документации по сельдерею. Обеспечение того, чтобы задача выполнялась только по одному за раз .Этот пример был дан для Django, однако я использую флешку. Я приложил все усилия, чтобы преобразовать это для работы с Flask, однако я все еще вижу myTask1, блокировка которого может быть запущена несколько раз.

Одна вещь, котораяМне неясно, правильно ли я использую кеш, я никогда не использовал его раньше, поэтому для меня все это ново.Одна вещь из документа, которая упоминается, но не объясняется, это:

Примечания к документу:

In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.

Я не совсем уверен, что это значит, должен ли я использоватькэшировать вместе с базой данных, и если да, то как мне это сделать?Я использую mongodb.В моем коде у меня просто есть эта настройка для кэша cache = Cache(app, config={'CACHE_TYPE': 'simple'}), как это было упомянуто в документах Flask-Cache Flask-Cache Docs

Еще одна вещь, которая мне не яснаесли есть что-то другое, что мне нужно сделать, так как я звоню своему myTask1 из моего маршрута Flask task1

Вот пример моего кода, который я использую.

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time


app = Flask(__name__)

cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################


app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')

    lock_id = self.name

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html') 


@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))

Окончательный рабочий код

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
import redis
from flask_redis import FlaskRedis


app = Flask(__name__)

# ADDING REDIS
redis_store = FlaskRedis(app)

# POINTING CACHE_TYPE TO REDIS
cache = Cache(app, config={'CACHE_TYPE': 'redis'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################

# CELERY USING REDIS
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    print('in memcache_lock and timeout_at is {}'.format(timeout_at))
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
        print('memcache_lock and status is {}'.format(status))
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')
    print('dir is {} '.format(dir(self)))

    lock_id = self.name
    print('lock_id is {}'.format(lock_id))

    with memcache_lock(lock_id, self.app.oid) as acquired:
        print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'myTask1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html')

@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))


if __name__ == '__main__':
    app.secret_key = 'super secret key for me123456789987654321'
    app.run(port=1234, host='localhost')

Здесь также показан снимок экрана, на котором вы видите, что я запускал myTask1 два раза и myTask2 один раз.Теперь у меня ожидаемое поведение для myTask1.Теперь myTask1 будет выполняться одним рабочим, если другой работник попытается его забрать, он будет просто повторять попытки, основываясь на том, что я определил.

Flower Dashboard

Ответы [ 2 ]

0 голосов
/ 30 декабря 2018

В своем вопросе вы указываете это предупреждение из примера Celery, который вы использовали:

Для того, чтобы это работало правильно, вы должны использовать бэкэнд кеша, где операция .addатомное.memcached, как известно, хорошо работает для этой цели.

И вы упоминаете, что вы действительно не понимаете, что это значит.Действительно, код, который вы показываете, демонстрирует, что вы не учли это предупреждение, потому что ваш код использует недопустимый бэкэнд.

Рассмотрите этот код:

with memcache_lock(lock_id, self.app.oid) as acquired:
    if acquired:
        # do some work

ЧтоВы хотите, чтобы значение acquired было истинным только для одного потока за раз.Если два потока входят в блок with одновременно, только один из них должен "выиграть" и иметь значение acquired, равное true.Этот поток, имеющий acquired true, может затем продолжить свою работу, а другой поток должен пропустить выполнение работы и позже попытаться снова получить блокировку. Чтобы гарантировать, что только один поток может иметь acquired true, .add должен быть атомарным.

Вот некоторый псевдокод того, что .add(key, value) делает:

1. if <key> is already in the cache:
2.   return False    
3. else:
4.   set the cache so that <key> has the value <value>
5.   return True

Если выполнение .add не является атомарным, это может произойти, если два потока A и B выполняют .add("foo", "bar").Предположим, что в начале пустой кэш.

  1. Поток A выполняет 1. if "foo" is already in the cache и обнаруживает, что "foo" не находится в кэше, и переходит к строке 3, но планировщик потока переключает управление на поток B.
  2. Поток B также выполняет 1. if "foo" is already in the cache, а также обнаруживает, что "foo" отсутствует в кэше.Таким образом, он переходит к строке 3, а затем выполняет строки 4 и 5, которые устанавливают для ключа "foo" значение "bar", и вызов возвращает True.
  3. В конце концов, планировщик возвращает управление потоку A, который продолжает выполнять 3, 4, 5, а также устанавливает ключ "foo" в значение "bar", а также возвращает True.

. Здесь у вас есть два .add вызова, которыеreturn True, если эти .add вызовы сделаны в пределах memcache_lock, это означает, что два потока могут иметь acquired true.Таким образом, два потока могут работать одновременно, и ваш memcache_lock не выполняет того, что должен делать, то есть позволяет только одному потоку работать одновременно.

Вы не используетекэш, который гарантирует, что .add является атомарным .Вы инициализируете его следующим образом:

cache = Cache(app, config={'CACHE_TYPE': 'simple'})

simple backend ограничен одним процессом, не имеет потоковой безопасности и имеет операцию .add, которая не является атомарной,(Между прочим, это вообще не касается Mongo. Если вы хотите, чтобы Mongo поддерживал ваш кеш, вам нужно указать резервную копию, специально созданную для отправки данных в базу данных Mongo.)

необходимо переключиться на другой бэкэнд, который гарантирует, что .add является атомарнымВы можете последовать примеру Celery и использовать memcached backend , который имеет атомарную операцию .add.Я не использую Flask, но по сути делаю то, что вы делаете с Django и Celery, и успешно использовал бэкэнд Redis для обеспечения того типа блокировки, который вы здесь используете.

0 голосов
/ 28 декабря 2018

При такой настройке вы все равно должны ожидать, что работники получат задачу, поскольку блокировка проверяется внутри самой задачи.Единственная разница будет в том, что работа не будет выполнена, если блокировка получена другим работником.
В примере, приведенном в документации, это желаемое поведение;если блокировка уже существует, задача просто ничего не сделает и завершится успешно.То, что вы хотите, немного отличается;вы хотите, чтобы работа ставилась в очередь, а не игнорировалась.

Чтобы получить желаемый эффект, вам нужно убедиться, что задача будет подобрана работником и выполнена в будущем.Один из способов сделать это - повторить попытку.

@task(bind=True, name='my-task')
def my_task(self):
    lock_id = self.name

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later
...