Ошибки запроса MySQL при подключении из задачи Celery, запущенной на Heroku - PullRequest
0 голосов
/ 27 ноября 2018

Я вижу неправильные результаты запросов при выполнении запросов к внешней базе данных MySQL, но только при подключении из задач Celery, работающих на Heroku.Те же самые задачи при запуске на моей собственной машине не показывают эти ошибки, и ошибки появляются только примерно в половине случаев (хотя при сбое все задачи неверны).

Задачи управляются Celery через Redis, а база данных MySQL сама по себе не работает на Heroku.И моя локальная машина, и Heroku подключаются к одной и той же базе данных MySQL.

Я подключаюсь к базе данных, используя MySQL, с драйвером pymysql, используя;

DB_URI = 'mysql+pymysql://USER:PW@SERVER/DB'

engine = create_engine(stats_config.DB_URI, convert_unicode=True, echo_pool=True)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()

Задачи выполняются одна за другой.

Вот пример задачи с разными результатами:

@shared_task(bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):

    db_session.close()
    start_date = datetime.strptime(g_start_date, '%d-%m-%Y')
    end_date = datetime.strptime(g_end_date, '%d-%m-%Y')

    gross_rev_trans_VK = db_session.query(func.sum(UsersTransactionsVK.amount)).filter(UsersTransactionsVK.date_added >= start_date, UsersTransactionsVK.date_added <= end_date, UsersTransactionsVK.payed == 'Yes').scalar()
    gross_rev_trans_Stripe = db_session.query(func.sum(UsersTransactionsStripe.amount)).filter(UsersTransactionsStripe.date_added >= start_date, UsersTransactionsStripe.date_added <= end_date, UsersTransactionsStripe.payed == 'Yes').scalar()
    gross_rev_trans = db_session.query(func.sum(UsersTransactions.amount)).filter(UsersTransactions.date_added >= start_date, UsersTransactions.date_added <= end_date, UsersTransactions.on_hold == 'No').scalar()

    if gross_rev_trans_VK is None:
        gross_rev_trans_VK = 0

    if gross_rev_trans_Stripe is None:
        gross_rev_trans_Stripe = 0

    if gross_rev_trans is None:
        gross_rev_trans = 0

    print ('gross', gross_rev_trans_VK, gross_rev_trans_Stripe, gross_rev_trans)

    total_gross_rev = gross_rev_trans_VK + gross_rev_trans_Stripe + gross_rev_trans

    return {'total_rev' : str(total_gross_rev / 100), 'current': 100, 'total': 100, 'statistic': 'get_gross_revenue', 'time_benchmark': (datetime.today() - START_TIME_FORM).total_seconds()}

# Selects gross revenue between selected dates
@app.route('/get-gross-revenue', methods=["POST"])
@basic_auth.required
@check_verified
def get_gross_revenue():
    if request.method == "POST":
        task = get_gross_revenue_task.apply_async([session['g_start_date'], session['g_end_date'], session['START_TIME_FORM']])
        return json.dumps({}), 202, {'Location': url_for('taskstatus_get_gross_revenue', task_id=task.id)}

Это простые и быстрые задачи, которые выполняются в течение нескольких секунд.

Задачи не выполняются попроизводя небольшие различия.Например, для задачи, в которой правильный результат был бы к 30111, когда что-то сломалось, задача выдала бы 29811.Это всегда код, который использует `db

Что я пробовал:

  • Я уже использую тот же часовой пояс, выполнив:

    db_session.execute("SET SESSION time_zone = 'Europe/Berlin'")
    
  • Я проверил ошибки в журналах рабочих.Хотя есть некоторые записи, такие как

    2013 Lost connection to MySQL
    
    sqlalchemy.exc.ResourceClosedError: This result object does not return rows. It has been closed automatically
    
    2014 commands out of sync
    

    , я не нашел корреляции между ошибками SQL и неправильными результатами.Неправильные результаты задач могут появиться без потери соединения.

  • Очень грязное исправление - жестко запрограммировать ожидаемый результат для одной из задач, сначала выполнить его, а затем повторно отправить всеесли полученный результат неверен.

  • Вероятно, это проблема уровня кэша или уровня изоляции при использовании сеанса SQLAlchemy.Поскольку мне всегда нужно было использовать SELECT (без вставок или обновлений), я также пробовал различные настройки для уровня изоляции, прежде чем запускать задачи, такие как

    #db_session.close()
    #db_session.commit()
    #db_session.execute('SET TRANSACTION READ ONLY')
    

    Они показывают ошибку, когда я запускаю их на Heroku, но они работают, когда я запускаю их на своем компьютере с Windows.

    Я также пытался изменить само соединение с помощью 'isolation_level="READ UNCOMMITTED', без какого-либо результата.

  • Яуверены, что рабочие не используют один и тот же db_session.

  • Кажется, что только задачи, использующие db_session в запросе, могут возвращать неверные результаты.Код, использующий атрибут query в базовом классе Base (объект db_session.query_property(), например, Users.query), по-видимому, не имеет проблем.Я думал, что это в основном то же самое?

1 Ответ

0 голосов
/ 03 декабря 2018

Вы повторно используете сеансы между задачами у разных работников.Создайте свой сеанс на одного работника Celery или даже на одно задание.

Знайте, что задания фактически сохраняются для каждого работника.Вы можете использовать это для кэширования сеанса для каждой задачи, поэтому вам не нужно заново создавать сеанс при каждом запуске задачи.Это проще всего сделать с помощью пользовательского класса задач ;в документации в качестве примера используется кэширование соединений с базой данных.

Чтобы сделать это с сеансом SQLAlchemy, используйте:

Session = scoped_session(sessionmaker(autocommit=True, autoflush=True))

class SQLASessionTask(Task):
    _session = None

    @property
    def session(self):
        if self._session is None:
            engine = create_engine(
                stats_config.DB_URI, convert_unicode=True, echo_pool=True) 
            self._session = Session(bind=engine)
        return self._session

Используйте это как:

@shared_task(base=SQLASessionTask, bind=True, name="get_gross_revenue_task")
def get_gross_revenue_task(self, g_start_date, g_end_date, START_TIME_FORM):
    db_session = self.session
    # ... etc.

Thisсоздает сеанс SQLAlchemy для текущей задачи только в том случае, если он нужен, в момент доступа к self.session.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...