Поток & Очередь против Серийной производительности - PullRequest
0 голосов
/ 13 ноября 2009

Мне было бы интересно посмотреть на потоки и очереди, поэтому я написал 2 сценария: один разбивает файл и шифрует каждый кусок в потоке, а другой - последовательно. Я все еще очень плохо знаком с Python и не знаю, почему скрипт протектора занимает намного больше времени.

Сценарий с резьбой:

#!/usr/bin/env python

from Crypto.Cipher import AES
from optparse import OptionParser
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue


BLOCK_SIZE = 32 #32 = 256-bit | 16 = 128-bit
TFILE = 'mytestfile.bin'
CHUNK_SIZE = 2048 * 2048
KEY = os.urandom(32)

class DataSplit():
    def __init__(self,fileObj, chunkSize):

        self.fileObj = fileObj
        self.chunkSize = chunkSize

    def split(self):
        while True:
            data = self.fileObj.read(self.chunkSize)
            if not data:
                break
            yield data

class encThread(threading.Thread):
    def __init__(self, seg_queue,result_queue, cipher):
        threading.Thread.__init__(self)
        self.seg_queue = seg_queue
        self.result_queue = result_queue
        self.cipher = cipher

    def run(self):
        while True:
            #Grab a data segment from the queue
            data = self.seg_queue.get()
            encSegment = []           
            for lines in data:
            encSegment.append(self.cipher.encrypt(lines))
            self.result_queue.put(encSegment)
            print "Segment Encrypted"
            self.seg_queue.task_done()

start = time.time()
def main():
    seg_queue = Queue.Queue()
    result_queue = Queue.Queue()
    estSegCount = (os.path.getsize(TFILE)/CHUNK_SIZE)+1
    cipher = AES.new(KEY, AES.MODE_CFB)
    #Spawn threads (one for each segment at the moment)
    for i in range(estSegCount):
        eT = encThread(seg_queue, result_queue, cipher)
        eT.setDaemon(True)
        eT.start()
        print ("thread spawned")

    fileObj = open(TFILE, "rb")
    splitter = DataSplit(fileObj, CHUNK_SIZE)
    for data in splitter.split():
        seg_queue.put(data)
        print ("Data sent to thread")

    seg_queue.join()
    #result_queue.join()
    print ("Seg Q: {0}".format(seg_queue.qsize()))
    print ("Res Q: {0}".format(result_queue.qsize()))



main()
print ("Elapsed Time: {0}".format(time.time()-start))

Серийный скрипт:

#!/usr/bin/env python

from Crypto.Cipher import AES
from optparse import OptionParser
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue

TFILE = 'mytestfile.bin'
CHUNK_SIZE = 2048 * 2048

class EncSeries():
    def __init(self):
        pass

    def loadFile(self,path):
        openFile = open(path, "rb")
        #fileData = openFile.readlines()
        fileData = openFile.read(CHUNK_SIZE)
        openFile.close()
        return fileData

    def encryptData(self,key, data):
        cipher = AES.new(key, AES.MODE_CFB)
        newData = []
        for lines in data:
            newData.append(cipher.encrypt(lines))
        return newData


start = time.time()
def main():
    print ("Start")
    key = os.urandom(32)
    run = EncSeries()
    fileData = run.loadFile(TFILE)

    encFileData=run.encryptData(key, fileData)
    print("Finish")

main()
print ("Elapsed Time: {0}".format(time.time()-start))

Использование readlines () вместо read, по-видимому, значительно ускоряет работу и в последовательной версии, но это уже намного быстрее, чем в многопоточной.

Ответы [ 5 ]

1 голос
/ 13 ноября 2009

Я смотрел презентацию, с которой связывался Дейв Кирби, и пробовал пример счетчика, который работает в два с лишним раза больше:

import time
from threading import Thread

countmax=100000000

def count(n):
    while n>0:
        n-=1

def main1():
    count(countmax)
    count(countmax)

def main2():
    t1=Thread(target=count,args=(countmax,))
    t2=Thread(target=count,args=(countmax,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

def timeit(func):
    start = time.time()
    func()
    end=time.time()-start
    print ("Elapsed Time: {0}".format(end))

if __name__ == '__main__':
    timeit(main1)
    timeit(main2)

Выходы:

Elapsed Time: 21.5470001698
Elapsed Time: 55.3279998302

Однако, если я изменю нить на процесс:

from multiprocessing import Process

и

t1=Process(target ....

и т.д.. Я получаю этот вывод:

Elapsed Time: 20.5
Elapsed Time: 10.4059998989

Теперь, как будто мой процессор Pentium имеет два ядра, я уверен, что это гиперпоточность. Может кто-нибудь попробовать это на своей двух- или четырехъядерной машине и запустить 2 или 4 потока?

См. Документацию по Python 2.6.4 для многопроцессорной обработки

1 голос
/ 13 ноября 2009
  1. Похоже, что ваша вторая версия читает только один фрагмент, а первая версия читает весь файл - это объясняет большое ускорение. Редактировать : Другая проблема: я только что заметил, что вы запускаете for lines in data без причины - это фактически зашифровывает символы по отдельности, что намного медленнее. Вместо этого просто передайте данные в encrypt напрямую.

  2. Нет смысла запускать больше процессорных потоков, чем у вас процессорных ядер.

  3. Потоки могут работать параллельно, только если они вызывают модуль расширения, который разблокирует GIL во время работы. Я не думаю, что PyCrypto делает это, поэтому вы не получите никакой параллельной работы здесь.

  4. Если бы узким местом была производительность диска, вы все равно не заметили бы здесь значительного улучшения - в этом случае было бы лучше иметь один поток, выполняющий дисковый ввод-вывод, а другой - шифрование. GIL не будет проблемой, поскольку он освобождается при выполнении дискового ввода-вывода.

1 голос
/ 13 ноября 2009

Потоки не являются волшебным способом ускорения программ - разделение работы на потоки обычно замедляет ее, если только программа не тратит значительную часть своего времени на ожидание ввода-вывода. Каждый новый поток увеличивает накладные расходы на код при разделении работы и больше накладных расходов в ОС при переключении между потоками.

Теоретически, если вы работаете на многопроцессорном процессоре, потоки могут выполняться на разных процессорах, поэтому работа выполняется параллельно, но даже в этом случае нет смысла иметь больше потоков, чем процессоров.

На практике все обстоит иначе, по крайней мере для C-версии Python. GIL не очень хорошо работает с несколькими процессорами. Посмотрите эту презентацию Дэвида Бизли по причинам, почему. IronPython и Jython не имеют этой проблемы.

Если вы действительно хотите распараллелить работу, то лучше порождать несколько процессов и распределять работу между ними, но есть вероятность, что издержки межпроцессного взаимодействия при передаче больших блоков данных сведут на нет любую выгоду параллелизма.

0 голосов
/ 17 февраля 2011

Просто небольшое замечание по обновлению этого потока: в python 3.2 появилась новая реализация GIL, которая освобождает много накладных расходов, связанных с многопоточностью, но не устраняет блокировку. (то есть он не позволяет использовать более одного ядра, но позволяет эффективно использовать несколько потоков на этом ядре).

0 голосов
/ 13 ноября 2009

Темы имеют несколько различных применений:

  1. Они обеспечивают ускорение только в том случае, если они позволяют одновременно подключить несколько устройств к вашей проблеме, независимо от того, являются ли эти устройства ядрами процессора или головками дисков.

  2. Они позволяют отслеживать несколько последовательностей событий ввода-вывода, которые были бы намного сложнее без них, например одновременные разговоры с несколькими пользователями.

Последнее сделано не для производительности, а для ясности кода.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...