Для операций на моем сервере Tornado, которые, как ожидается, будут блокироваться (и не могут быть легко изменены для использования таких вещей, как асинхронный клиент HTTP-запроса Tornado), я перенес работу на разделение рабочих процессов с помощью модуля multiprocessing
.В частности, я использовал многопроцессорную обработку Pool
, поскольку она предлагает метод с именем apply_async
, который очень хорошо работает с Tornado, поскольку он принимает обратный вызов в качестве одного из аргументов.
Недавно я понял, что пул предварительно выделяетколичество процессов, поэтому, если все они становятся блокирующими, операции, которые требуют нового процесса, должны будут ждать.Я понимаю, что сервер все еще может принимать соединения, так как apply_async
работает, добавляя вещи в очередь задач, и сам по себе довольно быстро завершает свою работу, но я надеюсь вызвать n процессов для n количество блокирующих задач, которые мне нужно выполнить.
Я подумал, что мог бы использовать метод add_handler
для IOLoop моего сервера Tornado, чтобы добавить обработчик для каждого нового PID, который я создаю для этого IOLoop.Я делал нечто подобное раньше, но он использовал popen и произвольную команду.Примером такого использования этого метода является здесь .Я хотел передать аргументы в произвольную целевую функцию Python в моей области видимости, поэтому я хотел придерживаться multiprocessing
.
Однако, похоже, что-то не нравится PID, которые мои multiprocessing.Process
объекты имеют.Я получаю IOError: [Errno 9] Bad file descriptor
.Эти процессы как-то ограничены?Я знаю, что PID недоступен до тех пор, пока я действительно не начну процесс, но я do запускаю процесс.Вот исходный код примера, который я сделал, который демонстрирует эту проблему:
#!/usr/bin/env python
"""Creates a small Tornado program to demonstrate asynchronous programming.
Specifically, this demonstrates using the multiprocessing module."""
import tornado.httpserver
import tornado.ioloop
import tornado.web
import multiprocessing as mp
import random
import time
__author__ = 'Brian McFadden'
__email__ = 'brimcfadden@gmail.com'
def sleepy(queue):
"""Pushes a string to the queue after sleeping for 5 seconds.
This sleeping can be thought of as a blocking operation."""
time.sleep(5)
queue.put("Now I'm awake.")
return
def random_num():
"""Returns a string containing a random number.
This function can be used by handlers to receive text for writing which
facilitates noticing change on the webpage when it is refreshed."""
n = random.random()
return "<br />Here is a random number to show change: {0}".format(n)
class SyncHandler(tornado.web.RequestHandler):
"""Demonstrates handing a request synchronously.
It executes sleepy() before writing some more text and a random number to
the webpage. While the process is sleeping, the Tornado server cannot
handle any requests at all."""
def get(self):
q = mp.Queue()
sleepy(q)
val = q.get()
self.write(val)
self.write('<br />Brought to you by SyncHandler.')
self.write('<br />Try refreshing me and then the main page.')
self.write(random_num())
class AsyncHandler(tornado.web.RequestHandler):
"""Demonstrates handing a request asynchronously.
It executes sleepy() before writing some more text and a random number to
the webpage. It passes the sleeping function off to another process using
the multiprocessing module in order to handle more requests concurrently to
the sleeping, which is like a blocking operation."""
@tornado.web.asynchronous
def get(self):
"""Handles the original GET request (normal function delegation).
Instead of directly invoking sleepy(), it passes a reference to the
function to the multiprocessing pool."""
# Create an interprocess data structure, a queue.
q = mp.Queue()
# Create a process for the sleepy function. Provide the queue.
p = mp.Process(target=sleepy, args=(q,))
# Start it, but don't use p.join(); that would block us.
p.start()
# Add our callback function to the IOLoop. The async_callback wrapper
# makes sure that Tornado sends an HTTP 500 error to the client if an
# uncaught exception occurs in the callback.
iol = tornado.ioloop.IOLoop.instance()
print "p.pid:", p.pid
iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
def _finish(self, q):
"""This is the callback for post-sleepy() request handling.
Operation of this function occurs in the original process."""
val = q.get()
self.write(val)
self.write('<br />Brought to you by AsyncHandler.')
self.write('<br />Try refreshing me and then the main page.')
self.write(random_num())
# Asynchronous handling must be manually finished.
self.finish()
class MainHandler(tornado.web.RequestHandler):
"""Returns a string and a random number.
Try to access this page in one window immediately after (<5 seconds of)
accessing /async or /sync in another window to see the difference between
them. Asynchronously performing the sleepy() function won't make the client
wait for data from this handler, but synchronously doing so will!"""
def get(self):
self.write('This is just responding to a simple request.')
self.write('<br />Try refreshing me after one of the other pages.')
self.write(random_num())
if __name__ == '__main__':
# Create an application using the above handlers.
application = tornado.web.Application([
(r"/", MainHandler),
(r"/sync", SyncHandler),
(r"/async", AsyncHandler),
])
# Create a single-process Tornado server from the application.
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(8888)
print 'The HTTP server is listening on port 8888.'
tornado.ioloop.IOLoop.instance().start()
Вот обратная трассировка:
Traceback (most recent call last):
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 810, in _stack_context
yield
File "/usr/local/lib/python2.6/dist-packages/tornado/stack_context.py", line 77, in StackContext
yield
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 827, in _execute
getattr(self, self.request.method.lower())(*args, **kwargs)
File "/usr/local/lib/python2.6/dist-packages/tornado/web.py", line 909, in wrapper
return method(self, *args, **kwargs)
File "./process_async.py", line 73, in get
iol.add_handler(p.pid, self.async_callback(self._finish, q), iol.READ)
File "/usr/local/lib/python2.6/dist-packages/tornado/ioloop.py", line 151, in add_handler
self._impl.register(fd, events | self.ERROR)
IOError: [Errno 9] Bad file descriptor
Приведенный выше код фактически модифицирован из более старого примера, которыйиспользуемые пулы процессов.Я сохранил его для справки для моих коллег и себя (отсюда большое количество комментариев) довольно долго.Я сконструировал его таким образом, чтобы можно было одновременно открывать два небольших окна браузера, чтобы продемонстрировать своему боссу, что URI / sync блокирует соединения, а / async допускает больше соединений.Для целей этого вопроса все, что вам нужно сделать, чтобы воспроизвести его, это попытаться получить доступ к обработчику / async.Это сразу же ошибки.
Что мне делать с этим?Как PID может быть «плохим»?Если вы запустите программу, вы увидите, что она распечатана на стандартный вывод.
Для записи я использую Python 2.6.5 в Ubuntu 10.04.Торнадо 1.1.