Потоки независимых функций и ожидание их завершения - PullRequest
0 голосов
/ 17 мая 2018

Если я запускаю поток с start_new_thread и у меня есть n независимых функций, которые я хочу запускать одновременно, я бы сделал следующее:

def foo1(x):
    print "foo1"
    time.sleep(5)

def foo2(x):
    print "foo2"
    time.sleep(1)

func_list = [foo1,foo2]

for k,j in enumerate(func_list):
    thread.start_new_thread(func_list[k],(1 ,))

Обе функции имеют один и тот же код, но обе функции также независимы в том, что они отправляют сообщения в независимые сокеты ZMQ (которые, в свою очередь, ждут ответа от внешнего API, прежде чем отправить обратно сообщение для обработки в foo) .

foo1 может потребоваться 5 секунд для завершения обработки в зависимости от времени возобновления API и размера полезной нагрузки, поэтому проблема заключается в том, что если я попытаюсь запустить его снова в новом потоке, пока он еще обрабатывает , сокет ZMQ выдает исключение (уже видел git, это не ошибка)

Так что, если foo1 занят, foo2 доступен, если foo2 занят, foo (n) может быть доступен (до foo15), так что есть много рабочих. Но как узнать, какая функция занята, и если она занята, дождаться ее завершения или, если доступны другие работники, использовать их вместо этого?

Помните, что я не могу просто спулировать 15 потоков одной и той же функции, потому что для всех целей и целей они независимы.

Может ли кто-нибудь помочь, это очень запутанная проблема, которую я создал для себя. Спасибо.

РЕДАКТИРОВАТЬ @ martineau -

У меня есть список сокетов, которые я импортирую, я бы не хотел этого делать, но используемый мной API не ограничивает http-соединения (в пределах разумного), но ограничивает количество запросов, которые может обрабатывать каждый. Следовательно, больше соединений - единственный путь к большей скорости.

Слежение является настройкой для задания - я обрабатываю 10 записей одновременно, что соответствует 10 соединениям, которые я поддерживаю с помощью API. Я просто пошел с пулами потоков и откажусь от призрака при запуске другого потока, если он занят (это слишком сложно), поэтому, если один поток занимает 5 секунд, он задержит следующий пакет из 10. Это компромисс.

import socket_handler_a, socket_handler_b ...

def multi_call(reduce_kp, exe_func):

        def trd_call_a(x,y):
                exe_func(socket_handler_a(x),y)

        def trd_call_b(x,y):
                exe_func(socket_handler_b(x),y)

        def trd_call_c(x,y):
                exe_func(socket_handler_c(x),y)

        def trd_call_d(x,y):
                exe_func(socket_handler_d(x),y)

        def trd_call_e(x,y):
                exe_func(socket_handler_e(x),y)

        def trd_call_f(x,y):
                exe_func(socket_handler_f(x),y)

        def trd_call_g(x,y):
                exe_func(socket_handler_g(x),y)

        def trd_call_h(x,y):
                exe_func(socket_handler_h(x),y)

        def trd_call_i(x,y):
                exe_func(socket_handler_i(x),y)

        def trd_call_j(x,y):
                exe_func(socket_handler_j(x),y)

        func_list = [trd_call_a, trd_call_b,
                     trd_call_c, trd_call_d,
                     trd_call_e, trd_call_f,
                     trd_call_g, trd_call_h,
                     trd_call_i, trd_call_j]

        def chunks_(l, n):
                for i in range(0, len(l), n):
                    yield l[i:i+n]

        threads = []
        for query_lst in chunks_([i for i in reduce_kp], 10):
                for k, j in enumerate(query_lst):

                    thread1 = threading.Thread(target=func_list[k], args=(j[0] ,j[1]))
                    thread1.start()
                    threads.append(thread1)

                for thread in threads: thread.join()

И это называется так:

def test_case(q_list):

        reduce_kp   = []
        for k in q_list: 
                reduce_kp.append([{'QTE':'EUR_USD'}, [k,'BAL'] ])
        multi_call(reduce_kp, test_case_resp)

А ответы вызываются из тем, т.е.

def test_case_resp(resp,x):
   #process resp
...