Соединение задачи Python Celery Pymysql закрыто слишком рано - PullRequest
0 голосов
/ 27 ноября 2018

Я использую celery / redis и python / flask для обработки некоторых очень длинных фоновых задач.Я использую heroku для размещения проекта.

В задаче я подключаюсь к базе данных mysql.Я использую pymysql здесь и sqlalchemy.

Важное примечание: проблема возникает только на heroku, она всегда работает на localhost.

Задачи выполняются одна за другой.У меня было много проблем с сельдереем, но это не проблема здесь (Если кому-то интересно, вот старый вопрос ).

После всего этого времени и тестирования я думаю, что проблемадолжно быть с MySQL закрытие соединения слишком рано.

Вот задача:

@shared_task(bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):

    db_session.close()
    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if gross_rev_trans_VK is None:
        gross_rev_trans_VK = 0

    if gross_rev_trans_Stripe is None:
        gross_rev_trans_Stripe = 0

    if gross_rev_trans is None:
        gross_rev_trans = 0

    print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans)

    total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans

    return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()}

# Selects gross revenue between selected dates
@app.route('/get-gross-revenue', methods=["POST"])
@basic_auth.required
@check_verified
def get_gross_revenue():
    if request.method == "POST":
        task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}

Например, эта задача всегда возвращает localhost ВСЕГДА 100k, но на heroku она возвращает 80k, я не получаюлюбые ошибки.Это просто возвращает неправильное значение.и так, что здесь происходит?Для выполнения задания требуется всего несколько секунд.


Примечания:

Я использую db_session.close() в начале задания, потому что у меня были ошибки на heroku "MySQL server has gone away".

Вот мой конфиг сельдерея:

CELERY_REDIS_MAX_CONNECTIONS=20
BROKER_POOL_LIMIT=None
CELERYD_WORKER_LOST_WAIT=20
CELERYD_MAX_TASKS_PER_CHILD=6
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 18000} # 5 hours
CELERY_RESULT_DB_SHORT_LIVED_SESSIONS = True #useful if: For example, intermittent errors like (OperationalError) (2006, ‘MySQL server has gone away’)

Некоторые из этих конфигураций могут быть неактуальными в настоящее время, потому что я выполняю задачи по одной (принудительно через JS).

Вот как я подключаюсь к MySQL:

def checkout_listener(dbapi_con, con_record, con_proxy):
    try:
        try:
            dbapi_con.ping(False)
        except TypeError:
            dbapi_con.ping()
    except dbapi_con.OperationalError as exc:
        if exc.args[0] in (2006, 2013, 2014, 2045, 2055):
            raise DisconnectionError()
        else:
            raise

engine = create_engine(stats_config.DB_URI, convert_unicode=True, pool_size=100, pool_recycle=3600) 
event.listen(engine, 'checkout', checkout_listener)

db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

В целом кажется, что около 10%~ результата теряется при переходе на геройку.

EDIT:

Я пробовал с очень большим запросом, и все результаты задачи на герою слегка отклонились примерно на 5%~, либо слишком высоко, либо слишком низко.

...