Не могу читать / писать в файлы, используя многопоточность в Python - PullRequest
0 голосов
/ 01 марта 2019

У меня есть входной файл, который будет содержать длинный список URL.Предположим, это в mylines.txt:

https://yahoo.com
https://google.com
https://facebook.com
https://twitter.com

Что мне нужно сделать, это:

1) Считать строку из входного файла mylines.txt

2)Выполнить функцию myFun.Который будет выполнять некоторые задачи.И вывод продукции состоит из строки.В моем реальном коде это сложнее.Но что-то вроде этого в концепции.

3) Запишите вывод в файл results.txt

Так как у меня большой ввод.Мне нужно использовать многопоточность Python.Я посмотрел на этот хороший пост здесь .Но, к сожалению, он предполагает ввод в виде простого списка и не предполагает, что я хочу записать вывод функции в файл.

Мне нужно убедиться, что вывод каждого входа записан в одной строке (т.е. опасность, если многопотоковые записи пишут в одну и ту же строку, поэтому я получаю неверные данные).

Я пытался скучать.Но безуспешно.Раньше я не использовал многопоточность Python, но пришло время учиться, поскольку в моем случае это неизбежно.У меня очень длинный список, который не может закончиться за разумное время без многопоточности.Моя функция не будет выполнять эту простую задачу, но больше операций, которые не нужны для концепции.

Вот моя попытка.Пожалуйста, исправьте меня (в самом коде):

import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import Queue

def myFunc(url):
        response = requests.get(url, verify=False ,timeout=(2, 5))
        results = open("myresults","a") # "a" to append results
        results.write("url is:",url, ", response is:", response.url)
        results.close()

worker_data = open("mylines.txt","r") # open my input file.

#load up a queue with your data, this will handle locking
q = Queue.Queue()

for url in worker_data:
    q.put(url)

# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)

# close the pool and wait for the work to finish
pool.close()
pool.join()

В: Как исправить приведенный выше код (пожалуйста, будьте лаконичны и помогите мне в самом коде), чтобы прочитать строку из входного файла, выполнитьФункция, напишите результат, связанный с вводом, в строке, используя многопоточность Python для одновременного выполнения requests, чтобы я мог закончить свой список в разумные сроки.

ОБНОВЛЕНИЕ:

На основании ответа код становится:

import threading
import requests
from multiprocessing.dummy import Pool as ThreadPool
import queue
from multiprocessing import Queue

def myFunc(url):
    response = requests.get(url, verify=False ,timeout=(2, 5))
    return "url is:" + url + ", response is:" + response.url

worker_data = open("mylines.txt","r") # open my input file.

#load up a queue with your data, this will handle locking
q = queue.Queue(4)
with open("mylines.txt","r") as f: # open my input file.
    for url in f:
        q.put(url)

# make the Pool of workers
pool = ThreadPool(4)
results = pool.map(myFunc, q)

with open("myresults","w") as f:
    for line in results:
        f.write(line + '\n')

mylines.txt содержит:

https://yahoo.com
https://www.google.com
https://facebook.com
https://twitter.com

Обратите внимание, что я впервые использовал:

import Queue

И: q = Queue.Queue (4)

Но получил сообщение об ошибке:

Traceback (most recent call last):
  File "test3.py", line 4, in <module>
    import Queue
ModuleNotFoundError: No module named 'Queue'

На основании какого-либо поиска я изменяюсь на:

import queue

И что касается строки: q = queue.Queue (4)

Я также добавил:

from multiprocessing import Queue

Но ничего не работает. Может ли какой-нибудь специалист по многопоточности Python помочь?

Ответы [ 2 ]

0 голосов
/ 01 марта 2019

Вместо того, чтобы потоки рабочего пула распечатывали результат, который не гарантирует правильную буферизацию вывода, вместо этого создайте еще один поток, который считывает результаты за секунду Queue и печатает их.

Я изменил ваше решение, чтобы оно создавало собственный поток рабочих потоков.Нет смысла придавать очереди бесконечную длину, так как основной поток будет блокироваться, когда очередь достигает максимального размера: вам нужно только, чтобы он был достаточно длинным, чтобы гарантировать, что рабочие потоки всегда будут обрабатываться - основной поток заблокируети разблокировать при увеличении и уменьшении размера очереди.

Он также идентифицирует поток, ответственный за каждый элемент в выходной очереди, что должно дать вам некоторую уверенность в том, что работает многопоточный подход, и печатает код ответа изсервер.Я обнаружил, что должен был убрать переводы строк с URL-адресов.

Поскольку теперь только один поток пишет в файл, записи всегда идеально синхронизированы, и нет никакой возможности, чтобы они мешали друг другу.

import threading
import requests
import queue
POOL_SIZE = 4

def myFunc(inq, outq):  # worker thread deals only with queues
    while True:
        url = inq.get()  # Blocks until something available
        if url is None:
            break
        response = requests.get(url.strip(), timeout=(2, 5))
        outq.put((url, response, threading.currentThread().name))


class Writer(threading.Thread):
    def __init__(self, q):
        super().__init__()
        self.results = open("myresults","a") # "a" to append results
        self.queue = q
    def run(self):
        while True:
            url, response, threadname = self.queue.get()
            if response is None:
                self.results.close()
                break
            print("****url is:",url, ", response is:", response.status_code, response.url, "thread", threadname, file=self.results)

#load up a queue with your data, this will handle locking
inq = queue.Queue()  # could usefully limit queue size here
outq = queue.Queue()

# start the Writer
writer = Writer(outq)
writer.start()

# make the Pool of workers
threads = []
for i in range(POOL_SIZE):
    thread = threading.Thread(target=myFunc, name=f"worker{i}", args=(inq, outq))
    thread.start()
    threads.append(thread)

# push the work onto the queues
with open("mylines.txt","r") as worker_data: # open my input file.
    for url in worker_data:
        inq.put(url.strip())
for thread in threads:
    inq.put(None)

# close the pool and wait for the workers to finish
for thread in threads:
    thread.join()

# Terminate the writer
outq.put((None, None, None))
writer.join()

Используя данные, приведенные в mylines.txt, я вижу следующий вывод:

****url is: https://www.google.com , response is: 200 https://www.google.com/ thread worker1
****url is: https://twitter.com , response is: 200 https://twitter.com/ thread worker2
****url is: https://facebook.com , response is: 200 https://www.facebook.com/ thread worker0
****url is: https://www.censys.io , response is: 200 https://censys.io/ thread worker1
****url is: https://yahoo.com , response is: 200 https://uk.yahoo.com/?p=us thread worker3
0 голосов
/ 01 марта 2019

Вы должны изменить свою функцию, чтобы она возвращала строку:

def myFunc(url):
    response = requests.get(url, verify=False ,timeout=(2, 5))
    return "url is:" + url + ", response is:" + response.url

и записывала эти строки в файл позже:

results = pool.map(myFunc, q)

with open("myresults","w") as f:
    for line in results:
        f.write(line + '\n')

Это поддерживает многопоточность для requests.get, но сериализует запись результатов в выходной файл.

Обновление:

И вы также должны использовать with для чтения входного файла:

#load up a queue with your data, this will handle locking
q = Queue.Queue()

with open("mylines.txt","r") as f: # open my input file.
    for url in f:
        q.put(url)
...