Проблема производителя-потребителя - попытка сохранить в CSV-файл - PullRequest
0 голосов
/ 07 марта 2020

, так что эта, казалось бы, простая проблема заставляет меня задуматься.

У меня есть набор данных (datas), и я выполняю некоторую обработку (это не проблема, хотя это занимает время из-за размер набора данных) для создания нескольких строк, которые будут сохранены в файл CSV. Тем не менее, очень сложно создать строку, затем сохранить ее в CSV, затем создать строку, а затем сохранить ее и т. Д. c.

Так что я пытаюсь реализовать потоки производителей и потребителей - производители будут создайте каждую строку данных (чтобы ускорить процесс), сохраните в очереди, и один потребитель затем добавит в мой файл CSV.

Мои попытки, приведенные ниже, иногда приводят к успеху (данные сохраняются правильно) или в других случаях данные «обрезаются» (либо целая строка, либо ее часть).

Что я делаю не так?


from threading import Thread
from queue import Queue
import csv

q = Queue()

def producer():
    datas = [["hello","world"],["test","hey"],["my","away"],["your","gone"],["bye","hat"]]
    for data in datas:
        q.put(data)


def consumer():
    while True:
        local = q.get()
        file = open('dataset.csv','a')
        with file as fd:
            writer = csv.writer(fd)
            writer.writerow(local)
        file.close()

        q.task_done()


for i in range(10):
    t = Thread(target=consumer)
    t.daemon = True
    t.start()

producer()

q.join()

1 Ответ

2 голосов
/ 07 марта 2020

Я думаю, это похоже на то, что вы пытаетесь сделать. Для целей тестирования он добавляет к каждой строке данных в файле CSV префикс «идентификатор производителя», чтобы источник данных можно было увидеть в результатах.

Как вы сможете увидеть из CSV созданный файл, все полученные данные помещаются в него.

import csv
import random
from queue import Queue
from threading import Thread
import time

SENTINEL = object()

def producer(q, id):
    data = (("hello", "world"), ("test", "hey"), ("my", "away"), ("your", "gone"),
            ("bye", "hat"))
    for datum in data:
        q.put((id,) + datum)  # Prefix producer ID to datum for testing.
        time.sleep(random.random())  # Vary thread speed for testing.


class Consumer(Thread):
    def __init__(self, q):
        super().__init__()
        self.q = q

    def run(self):
        with open('dataset.csv', 'w', newline='') as file:
            writer = csv.writer(file, delimiter=',')
            while True:
                datum = self.q.get()
                if datum is SENTINEL:
                    break
                writer.writerow(datum)

def main():
    NUM_PRODUCERS = 10
    queue = Queue()

    # Create producer threads.
    threads = []
    for id in range(NUM_PRODUCERS):
        t = Thread(target=producer, args=(queue, id+1,))
        t.start()
        threads.append(t)

    # Create Consumer thread.
    consumer = Consumer(queue)
    consumer.start()

    # Wait for all producer threads to finish.
    while threads:
        threads = [thread for thread in threads if thread.is_alive()]

    queue.put(SENTINEL)  # Indicate to consumer thread no more data.
    consumer.join()
    print('Done')

if __name__ == '__main__':
    main()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...