Я запускаю сервер Bokeh , используя базовый Tornado framework.
Мне нужен сервер, чтобы обновить некоторые данные в какой-то момент. Это делается путем извлечения строк из базы данных Oracle с использованием Cx_Oracle .
Благодаря Tornado PeriodicCallback программа каждые 30 секунд проверяет, должны ли загружаться новые данные:
server.start()
from tornado.ioloop import PeriodicCallback
pcallback = PeriodicCallback(db_obj.reload_data_async, 10 * 1e3)
pcallback.start()
server.io_loop.start()
Где db_obj
- это экземпляр класса, который заботится о функциях, связанных с БД (connect, fetch, ...).
По сути, так выглядит функция reload_data_async
:
executor = concurrent.futures.ThreadPoolExecutor(4)
# methods of the db_obj class ...
@gen.coroutine
def reload_data_async(self):
# ... first, some code to check if the data should be reloaded ...
# ...
if data_should_be_reloaded:
new_data = yield executor.submit(self.fetch_data)
def fetch_data(self):
""" fetch new data in the DB """
cursor = cx.Cursor(self.db_connection)
cursor.execute("some SQL select request that takes time (select * from ...)")
rows = cursor.fetchall()
# some more processing thereafter
# ...
В принципе, это работает. Но когда я пытаюсь прочитать данные во время их загрузки в fetch_data
(щелкнув для отображения в графическом интерфейсе), программа вылетает из-за состояния гонки (я полагаю?): Она получает доступ к данным, когда они выбираются в то же время время.
Я только что обнаружил, что tornado.concurrent.futures не являются потокобезопасными:
tornado.concurrent.Future похож на concurrent.futures.Future, но
не потокобезопасен (и поэтому быстрее для использования с однопоточным
циклы событий).
В общем, я думаю, что я должен создать новый поток, чтобы заботиться о операциях CX_Oracle. Могу ли я сделать это с помощью Tornado и продолжать использовать функцию PerodicCallback
? Как я могу преобразовать мою асинхронную операцию в потокобезопасную? Как это сделать?
PS: я использую Python 2.7
Спасибо