Как вы опрашиваете результат от функции сельдерея с колбой и сокето? - PullRequest
0 голосов
/ 22 июня 2019

У меня запущено приложение фляги (python 3.5), которое позволяет выполнять параллельные запросы.Это было достигнуто с помощью резьбового асинхронного режима на socketio.Поэтому многие пользователи могут запросить api фляги, которая в свою очередь вызывает функцию сельдерея с именем get_output ().В зависимости от ввода пользователя функция get_output () будет принимать переменное время.От 0 до 2 с при макс.

Функция get_output () (это фиктивная функция, но функциональность аналогична):

@celery.task 
def get_output(var_1,var_2,var_3,var_4): 
    if(var_1 == 1): 
       #this is just a mock demo but the process will take 2s if the 
       #user inputs a specific value to the api.  
       time.sleep(2) 
       return {"result":1} 
    else: 
       return {"result":0} 

Подход 1:

@app.route('/run')
def run: 
     z1 = get_output.delay(var_1,var_2,var_3,var_4)
     if(z1.get()["result"] == 1): 
         return True 
     else: 
         return False 

Однако это не удается,поскольку он просто проверяет один раз, а затем возвращает значение «Ложь», в результате None

Подход 2:

lock = threading.lock()
@app.route('/run')
def run: 
     lock.acquire() 
     z1 = get_output.delay(var_1,var_2,var_3,var_4)

     z = z1.get()["result"] 
     lock.release() 
     while(z is None): 
           time.sleep(.5) 
           lock.acquire()
           z = get_output.delay(var_1,var_2,var_3,var_4).get()["result"]
           lock.release()  
     if(z == 1): 
         return True 
     else: 
         return False

Однако при сбое 2 одновременных запросов происходит сбой: возникает ошибка времени выполнения:

   z = z1.get()["result"]
   on_message=on_message,
  File "/.local/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 188, in wait_for_pending
    for _ in self._wait_for_pending(result, **kwargs):
  File "/.local/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 255, in _wait_for_pending
    on_interval=on_interval):
  File "/.local/lib/python3.5/site-packages/celery/backends/asynchronous.py", line 56, in drain_events_until
    yield self.wait_for(p, wait, timeout=1)
  events = self.read_poller.poll(timeout)
  RuntimeError: concurrent poll() invocation

Я исследовал и нашел людей с такой же проблемой, но ни одно из решений не помогло.Пожалуйста помоги.Я использую сервер Debian в облачной платформе Google

...