Как предотвратить переключение между потоками при запуске какой-либо функции.питон - PullRequest
0 голосов
/ 06 февраля 2019

Среда:
Ubuntu 18.04
Python 3.6.6

Вот пример кода:

import threading
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(3)


def thread_run():
    i = 0
    for x in range(10):
        i += 1
        print(x, threading.get_ident())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    for x in range(2):
        loop.run_in_executor(executor, thread_run)
    loop.run_forever()

Вывод:

0 140522512643840
1 140522512643840
2 140522512643840
3 140522512643840
4 140522512643840
5 140522512643840
6 140522512643840
7 140522512643840
0 140522504251136
1 140522504251136
2 140522504251136
3 140522504251136
4 140522504251136
5 140522504251136
6 140522504251136
7 140522504251136
8 140522504251136
9 140522504251136
8 140522512643840
9 140522512643840

Вопросы:
Как можно предотвратить переключение «контекста» для функции thread_run?
Как сделать некоторыефункция «атомарная»?
Ожидаемый результат (с сохранением нескольких потоков):

0 140522512643840
...
9 140522512643840
0 140522504251136
...
9 140522504251136

PS: требуется, чтобы сохранить метод вызова thread_run (loop.run_in_executor).Это просто упрощенный пример.Я спрашиваю только о случае, который описан в примере.Я знаю, что есть много способов изменить код и избавиться от loop.run_in_executor, но я пытаюсь найти решение в этом конкретном случае.

PSS: такое же поведение в Windows 10 (циклыувеличено до 100)

67 8704
16 14712
68 8704
69 8704
70 8704
17 14712
71 8704

Обновление: # 1 Я пытался использовать декокатор: (с этот ответ )

def synchronized(wrapped):
    lock = threading.Lock()
    @functools.wraps(wrapped)
    def _wrap(*args, **kwargs):
        with lock:
            result = wrapped(*args, **kwargs)
            return result

Но он не работает для нескольких функций:

import threading
import asyncio
import functools
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(3)


def synchronized(wrapped):
    lock = threading.Lock()
    @functools.wraps(wrapped)
    def _wrap(*args, **kwargs):
        with lock:
            result = wrapped(*args, **kwargs)
            return result

    return _wrap


@synchronized
def thread_run():
    i = 0
    for x in range(5):
        i += 1
        print(x, "thread_run", threading.get_ident())


@synchronized
def thread_run2():
    i = 0
    for x in range(5):
        i += 1
        print(x, "thread_run2", threading.get_ident())


def not_important():
    i = 0
    for x in range(5):
        i += 1
        print(x, "not_important", threading.get_ident())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_in_executor(executor, thread_run)
    loop.run_in_executor(executor, thread_run2)
    loop.run_in_executor(executor, not_important)
    loop.run_forever()

Вывод:

0 thread_run 140039310980864
0 thread_run2 140039302588160
0 not_important 140039220623104
1 not_important 140039220623104
2 not_important 140039220623104
1 thread_run2 140039302588160
2 thread_run2 140039302588160
3 thread_run2 140039302588160
4 thread_run2 140039302588160
3 not_important 140039220623104
4 not_important 140039220623104
1 thread_run 140039310980864
2 thread_run 140039310980864
3 thread_run 140039310980864
4 thread_run 140039310980864

Ожидается: Каждая функция (кроме not_important) выполняется в последовательности,Не параллельно.

Обновление № 2: Я добавил ответ с «половинным» решением.Но это не решает проблему, когда вы хотите «макр» одну функцию, которая не должна прерываться ни одной другой функцией.

Ответы [ 2 ]

0 голосов
/ 06 февраля 2019

run_in_executor возвращает будущее, которое вы можете ожидать для архивного последовательного выполнения

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(3)

def thread_run():
    i = 0
    for x in range(10):
        i += 1
        print(x, threading.get_ident())

async def run(loop):
    for _ in range(2):
        await loop.run_in_executor(executor, thread_run)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(run(loop))
    loop.run_forever()
0 голосов
/ 06 февраля 2019

Текущее решение (не полностью решено) основано на комментарии @shmee

import threading
import asyncio
import functools
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(3)


def synchronized(wrapped):
    @functools.wraps(wrapped)
    def _wrap(*args, **kwargs):
        with lock:
            result = wrapped(*args, **kwargs)
            return result

    return _wrap


@synchronized
def thread_run():
    i = 0
    for x in range(10):
        i += 1
        print(x, "thread_run", threading.get_ident())


@synchronized
def thread_run2():
    i = 0
    for x in range(10):
        i += 1
        print(x, "thread_run2", threading.get_ident())


def not_important():
    i = 0
    for x in range(10):
        i += 1
        print(x, "not_important", threading.get_ident())


if __name__ == '__main__':
    lock = threading.Lock()  # global lock

    loop = asyncio.get_event_loop()
    for x in range(5):
        loop.run_in_executor(executor, thread_run)
        loop.run_in_executor(executor, thread_run2)
        loop.run_in_executor(executor, not_important)
    loop.run_forever()

Вывод :

0 thread_run 140192637171456
1 thread_run 140192637171456
2 thread_run 140192637171456
3 thread_run 140192637171456
4 thread_run 140192637171456
5 thread_run 140192637171456
6 thread_run 140192637171456
7 thread_run 140192637171456
8 thread_run 140192637171456
9 thread_run 140192637171456
0 thread_run2 140192637171456
1 thread_run2 140192637171456
2 thread_run2 140192637171456
3 thread_run2 140192637171456
4 thread_run2 140192637171456
5 thread_run2 140192637171456
6 thread_run2 140192637171456
7 thread_run2 140192637171456
8 thread_run2 140192637171456
9 thread_run2 140192637171456
0 not_important 140192620386048
1 not_important 140192620386048
2 not_important 140192620386048
3 not_important 140192620386048
0 thread_run 140192637171456
1 thread_run 140192637171456
2 thread_run 140192637171456
3 thread_run 140192637171456
4 thread_run 140192637171456
5 thread_run 140192637171456
6 thread_run 140192637171456
7 thread_run 140192637171456
8 thread_run 140192637171456
9 thread_run 140192637171456
0 thread_run2 140192637171456
1 thread_run2 140192637171456
2 thread_run2 140192637171456
3 thread_run2 140192637171456
4 thread_run2 140192637171456
4 not_important 140192620386048
5 not_important 140192620386048
6 not_important 140192620386048
7 not_important 140192620386048
8 not_important 140192620386048
9 not_important 140192620386048
0 not_important 140192620386048
1 not_important 140192620386048
2 not_important 140192620386048
3 not_important 140192620386048
4 not_important 140192620386048
5 not_important 140192620386048
6 not_important 140192620386048
7 not_important 140192620386048
8 not_important 140192620386048
9 not_important 140192620386048
5 thread_run2 140192637171456
6 thread_run2 140192637171456
7 thread_run2 140192637171456
8 thread_run2 140192637171456
9 thread_run2 140192637171456
0 not_important 140192637171456
1 not_important 140192637171456
2 not_important 140192637171456
3 not_important 140192637171456
4 not_important 140192637171456
5 not_important 140192637171456
0 thread_run 140192620386048
1 thread_run 140192620386048
6 not_important 140192637171456
7 not_important 140192637171456
8 not_important 140192637171456
9 not_important 140192637171456
2 thread_run 140192620386048
3 thread_run 140192620386048
4 thread_run 140192620386048
5 thread_run 140192620386048
6 thread_run 140192620386048
7 thread_run 140192620386048
8 thread_run 140192620386048
9 thread_run 140192620386048
0 thread_run2 140192628778752
1 thread_run2 140192628778752
2 thread_run2 140192628778752
3 thread_run2 140192628778752
4 thread_run2 140192628778752
5 thread_run2 140192628778752
6 thread_run2 140192628778752
7 thread_run2 140192628778752
8 thread_run2 140192628778752
9 thread_run2 140192628778752
0 thread_run 140192637171456
1 thread_run 140192637171456
2 thread_run 140192637171456
3 thread_run 140192637171456
4 thread_run 140192637171456
5 thread_run 140192637171456
6 thread_run 140192637171456
7 thread_run 140192637171456
0 not_important 140192628778752
1 not_important 140192628778752
2 not_important 140192628778752
3 not_important 140192628778752
4 not_important 140192628778752
5 not_important 140192628778752
6 not_important 140192628778752
7 not_important 140192628778752
8 not_important 140192628778752
9 not_important 140192628778752
8 thread_run 140192637171456
9 thread_run 140192637171456
0 thread_run2 140192620386048
1 thread_run2 140192620386048
2 thread_run2 140192620386048
3 thread_run2 140192620386048
4 thread_run2 140192620386048
5 thread_run2 140192620386048
6 thread_run2 140192620386048
7 thread_run2 140192620386048
8 thread_run2 140192620386048
9 thread_run2 140192620386048
0 thread_run 140192628778752
1 thread_run 140192628778752
2 thread_run 140192628778752
3 thread_run 140192628778752
4 thread_run 140192628778752
5 thread_run 140192628778752
6 thread_run 140192628778752
7 thread_run 140192628778752
8 thread_run 140192628778752
9 thread_run 140192628778752
0 not_important 140192637171456
1 not_important 140192637171456
2 not_important 140192637171456
3 not_important 140192637171456
4 not_important 140192637171456
5 not_important 140192637171456
0 thread_run2 140192620386048
1 thread_run2 140192620386048
2 thread_run2 140192620386048
3 thread_run2 140192620386048
4 thread_run2 140192620386048
5 thread_run2 140192620386048
6 thread_run2 140192620386048
7 thread_run2 140192620386048
8 thread_run2 140192620386048
9 thread_run2 140192620386048
6 not_important 140192637171456
7 not_important 140192637171456
8 not_important 140192637171456
9 not_important 140192637171456

Текущий результат :
Функции thread_run и thread_run2 не конфликтуют.Но они могут быть прерваны функцией not_important.

Так что это частично решит мой вопрос.Я хотел бы найти способ предотвратить прерывание функций thread_run и thread_run2 любыми другими функциями.Потому что в реальном проекте есть много других методов / функций, и я не могу добавить synchronized для всех них.

...