Пул выполняет только один поток вместо 4, и как мне сделать его бесконечным? - PullRequest
0 голосов
/ 14 февраля 2019

Итак, я работаю над небольшим инструментом Python для стресс-тестирования API приложения.

У меня есть довольно хороший скрипт с использованием Threading, но потом я прочитал, что для его поддержки потребуется ручное кодирование.количество одновременных потоков (имеется в виду, начиная новые, как только старые заканчиваются), и предложение здесь: Как начать новый поток, когда старый заканчивается? это использовать ThreadPool, я попытался сделать следующее:

def test_post():
    print "Executing in " + threading.currentThread().getName() + "\n"
    time.sleep(randint(1, 3))
    return randint(1, 5), "Message"


if args.send:
    code, content = post()
    print (code, "\n")
    print (content)
elif args.test:
    # Create new threads
    print threads
    results_list = []
    pool = ThreadPool(processes=threads)
    results = pool.apply_async(test_post())
    pool.close()  # Done adding tasks.
    pool.join()  # Wait for all tasks to complete.
    # results = list(pool.imap_unordered(
    #     test_post(), ()
    # ))
    # thread_list = []
    # while threading.activeCount() <= threads:
    #     thread = LoadTesting(threadID=free_threads, name="Thread-" + str(threading.activeCount()), counter=1)
    #     thread.start()
    #     thread_list.append(thread)
    print "Exiting Main Thread" + "\n"
else:
    print ("cant get here!")

Когда я вызываю скрипт, я получаю согласованный вывод, такой как:

4

Выполнение в MainThread

Выход из основного потока

Я не уверен, почему ... как вы видите в закомментированном блоке, я пробовал разные способы, и он все еще делает это только один раз.

Моя цель - заставить скрипт работать вцикл, всегда работает n потоков в любое время.функции test_post (и соответственно post) возвращают код ответа HTTP и содержимое - я хотел бы позже использовать его для печати / остановки, когда код ответа НЕ 200 OK.

1 Ответ

0 голосов
/ 15 февраля 2019

Ваша первая проблема заключается в том, что вы уже вызывали свою функцию в MainThread с вызовом:

pool.apply_async(test_post())

... вместо передачи test_post в качестве аргумента для вызова, который будет выполнен вworker-thread with:

pool.apply_async(test_post)

OP: У меня есть довольно хороший скрипт, использующий Threading, но потом я прочитал, что ему потребуется ручное кодирование для поддержки n числа одновременных потоков(имеется в виду, начиная новые, как только старые закончат) ...

Необходимо различать единицу работы (задание, задание) и поток.Весь смысл использования пула в первую очередь заключается в повторном использовании исполнителей, будь то потоки или процессы.Рабочие уже созданы, когда создается экземпляр пула, и пока вы не закроете пул, все начальные потоки останутся живыми.Таким образом, вы не заботитесь о воссоздании потоков, вы просто вызываете методы пула существующего пула так часто, как у вас есть работа, которую вы хотите распределить.Пул принимает эти задания (вызов метода пула) и создает из него задачи.Эти задачи помещаются в неограниченную очередь.Всякий раз, когда рабочий завершает задание, он будет блокировать get() новую задачу из такого inqueue.


OP: Пул выполняет только один поток вместо 4... Я пробовал разные способы, и он все еще делает это только один раз.

pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

... - это задание, выполняемое одним вызовом и выполняющее одну задачу.Если вам нужно более одного исполнения func, вам нужно либо вызывать pool.apply_async() несколько раз, либо вы используете метод пула отображения, такой как

pool.map(func, iterable, chunksize=None)

..., который отображает одну функциюнад итеративным.pool.apply_async не блокирует, поэтому это "асинхронный".Он немедленно возвращает AsyncResult -объект, который вы можете (блокирующе) вызвать .wait() или .get() после.


Из комментариев стало ясно, что вы хотите бесконечный и немедленные замены для завершенных задач (самодельный поток ввода) ... и программа должна остановиться на KeyboardInterrupt или когда результат не имеет определенного значения.

Вы можете использовать callback -параметр apply_async, чтобы планировать новые задачи, как только любые старых завершены.Сложность заключается в том, что делать с MainThread, чтобы предотвратить преждевременное завершение всего скрипта при сохранении его отзывчивости для KeyboardInterrupt.Разрешение сна MainThread в цикле позволяет ему немедленно реагировать на KeyboardInterrupt, предотвращая преждевременный выход.В случае, если результат должен остановить программу, вы можете позволить обратному вызову завершить пул.Затем MainThread просто должен включить проверку состояния пула в своем цикле сна.

import time
from random import randint, choice
from itertools import count
from datetime import datetime
from threading import current_thread
from multiprocessing.pool import ThreadPool


def test_post(post_id):
    time.sleep(randint(1, 3))
    status_code = choice([200] * 9 + [404])
    return "{} {} Message no.{}: {}".format(
        datetime.now(), current_thread().name, post_id, status_code
    ), status_code


def handle_result(result):
    msg, code = result
    print(msg)
    if code != 200:
        print("terminating")
        pool.terminate()
    else:
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )


if __name__ == '__main__':

    N_WORKERS = 4

    post_cnt = count()

    pool = ThreadPool(N_WORKERS)

    # initial distribution
    for _ in range(N_WORKERS):
        pool.apply_async(
            test_post, args=(next(post_cnt),), callback=handle_result
        )

    try:
        while pool._state == 0:  # check if pool is still alive
            time.sleep(1)
    except KeyboardInterrupt:
        print(" got interrupt")

Пример вывода с KeyboardInterrupt:

$> python2 scratch.py
2019-02-15 18:46:11.724203 Thread-4 Message no.3: 200
2019-02-15 18:46:12.724713 Thread-2 Message no.1: 200
2019-02-15 18:46:13.726107 Thread-1 Message no.0: 200
2019-02-15 18:46:13.726292 Thread-3 Message no.2: 200
2019-02-15 18:46:14.724537 Thread-4 Message no.4: 200
2019-02-15 18:46:14.726881 Thread-2 Message no.5: 200
2019-02-15 18:46:14.727071 Thread-1 Message no.6: 200
^C got interrupt

Пример Вывод с завершением из-за нежелательноговозвращаемое значение:

$> python2 scratch.py
2019-02-15 18:44:19.966387 Thread-3 Message no.0: 200
2019-02-15 18:44:19.966491 Thread-4 Message no.1: 200
2019-02-15 18:44:19.966582 Thread-1 Message no.3: 200
2019-02-15 18:44:20.967555 Thread-2 Message no.2: 200
2019-02-15 18:44:20.968562 Thread-3 Message no.4: 404
terminating

Обратите внимание, что в вашем сценарии вы также можете вызывать apply_async чаще, чем N_WORKERS, чтобы ваш первоначальный дистрибутив имел некоторый буфер для уменьшения задержки.

...