Ошибка Queue.put внутри рабочего потока - PullRequest
0 голосов
/ 20 января 2019

Внутри рабочего потока я генерирую фрейм данных. Попытка поместить это в очередь, переданную рабочему потоку, терпит неудачу. На самом деле попытка поместить какие-либо значения в очередь не удалась.

Часть кода, которая не выполняется внутри рабочего потока task1 (), приведена ниже:

  df = pd.DataFrame([[1,2,3,4],[3,4,5,6]])
    qmdlvalues.put(df)
    mdltiming = time.time() - start
    qmdlparams.put(paramval)
    qtiming.put(mdltiming)

Полный код

import threading
import queue
from sklearn.manifold import TSNE
import os
import time

def write_tsne_op(opdata,fname,header):
    with open(fname, 'w') as outfile:
        outfile.write(header)

        for data_slice in opdata:           
            np.savetxt(outfile, data_slice,delimiter=",")

def task1(qmdlvalues,qmdlparams,qtiming,paramval):
    start = time.time()
    #tmpmdl1 = TSNE(perplexity=100,early_exaggeration=1, n_components=2,random_state=0,verbose=1)
    #qmdlvalues.put(tmpmdl1.fit_transform(dense_mx))
    df = pd.DataFrame([[1,2,3,4],[3,4,5,6]])
    qmdlvalues.put(df)
    mdltiming = time.time() - start
    qmdlparams.put(paramval)
    qtiming.put(mdltiming)
    print(df)
    print(str(mdltiming))
    print(paramval)

def task2(qmdlvalues,qmdlparams,qtiming,paramval):
    start = time.time()
    #tmpmdl2 = TSNE(perplexity=100,early_exaggeration=10, n_components=2,random_state=0,verbose=1)    
    #qmdlvalues.put(tmpmdl2.fit_transform(dense_mx2))
    qmdlvalues.put(pd.DataFrame([[1,2,3,4],[3,4,5,6]]))
    qmdlparams.put(paramval)
    mdltiming = time.time() - start
    qtiming.put(mdltiming)

if __name__ == "__main__": 


    dense_mx2 = dense_mx
    dense_mx3 = dense_mx

    qmdlvl = queue.Queue()
    qmdlch = queue.Queue()
    qtme   = queue.Queue()
    mdlvalues = pd.DataFrame()

    t1 = threading.Thread(target=task1,args=(qmdlvl,qmdlch,qtme,"#perplex: 100 early exag: 1 timing:$_plex100_exag1.csv"), name='t1')                          
    t2 = threading.Thread(target=task2,args=(qmdlvl,qmdlch,qtme,"#perplex: 100 early exag: 10 timing:$_plex100_exag10.cv"), name='t2')   

    # starting threads    
    t1.start() 
    t2.start() 

    while True:
        if qmdlvl.empty():
            print("Queue closed. Exiting thread.")   
            break
        try:
            item = qmdlvl.get(timeout=.5)

        except:
            continue
            print("Got item:", item)

    # wait until all threads finish 
    t1.join() 
    t2.join() 

Ниже приведен фактический вывод, который я получаю из кода в основном

    while True:
        if qmdlvl.empty():
            print("Queue closed. Exiting thread.")   
            break
        try:
            item = qmdlvl.get(timeout=.5)

        except:
            continue
            print("Got item:", item)

ID процесса, выполняющего основную программу: 6456 Имя основного потока: MainThread Очередь закрыта. Выход из потока.

Я хочу иметь возможность поместить фрейм данных в очередь внутри рабочего потока и получить доступ к тому же фрейму данных в основном потоке.

1 Ответ

0 голосов
/ 20 января 2019

В моем предыдущем коде есть несоответствия параметров, которые были исправлены, и полный рабочий код представлен ниже.

Я сохранил вывод t-SNE непосредственно в очередь и извлек его в основнойнить.Следующим шагом будет преобразование этого в пул потоков и подклассы.

import threading
import queue
from sklearn.manifold import TSNE
import os
import time

def write_tsne_op(opdata,fname,header):

    with open(fname, 'w') as outfile:
        outfile.write(header)
        for data_slice in opdata:
            np.savetxt(outfile, data_slice,delimiter=",")

def task1(ip_matrix,qmdlvalues,qmdlparam,plex,exag,qmdltime,qmdlhrfn,hderfname):
    string=""
    start=0
    end=0
    mdltiming=0
    start = time.time()
    tmpmdl1 = TSNE(perplexity=plex,early_exaggeration=exag, n_components=2,random_state=0,verbose=1)
    qmdlvalues.put(tmpmdl1.fit_transform(ip_matrix))    
    string = str(plex)+ "$" + str(exag)
    qmdlparam.put(string)
    qmdlhrfn.put(hderfname)
    end = time.time()
    mdltimig = end - start
    print(str(mdltiming)+"time")
    qmdltime.put(mdltiming)

def task2(ip_matrix,qmdlvalues,qmdlparam,plex,exag,qmdltime,qmdlhrfn,hderfname):
    string=""
    start=0
    end=0
    mdltiming=0
    start = time.time()    
    tmpmdl2 = TSNE(perplexity=plex,early_exaggeration=exag, n_components=2,random_state=0,verbose=1)    
    qmdlvalues.put(tmpmdl2.fit_transform(ip_matrix))
    string = str(plex)+ "$" + str(exag)
    qmdlparam.put(string)
    qmdlhrfn.put(hderfname)
    end = time.time()
    mdltimig = end - start
    qmdltime.put(mdltiming)

def task3(ip_matrix,qmdlvalues,qmdlparam,plex,exag,qmdltime,qmdlhrfn,hderfname):
    string=""
    start=0
    end=0
    mdltiming=0
    start = time.time()    
    tmpmdl3 = TSNE(perplexity=plex,early_exaggeration=exag, n_components=2,random_state=0,verbose=1)    
    qmdlvalues.put(tmpmdl3.fit_transform(ip_matrix))
    string = str(plex)+ "$" + str(exag)
    qmdlparam.put(string)
    qmdlhrfn.put(hderfname)
    end = time.time()
    mdltimig = end - start
    qmdltime.put(mdltiming)

def task4(ip_matrix,qmdlvalues,qmdlparam,plex,exag,qmdltime,qmdlhrfn,hderfname):
    string=""
    start=0
    end=0
    mdltiming=0
    start = time.time()    
    tmpmdl4 = TSNE(perplexity=plex,early_exaggeration=exag, n_components=2,random_state=0,verbose=1)    
    qmdlvalues.put(tmpmdl4.fit_transform(ip_matrix))
    string = str(plex)+ "$" + str(exag)
    qmdlparam.put(string)
    qmdlhrfn.put(hderfname)
    end = time.time()
    mdltimig = end - start
    qmdltime.put(mdltiming)

if __name__ == "__main__": 

    # print ID of current process 
    print("ID of process running main program: {}".format(os.getpid())) 

    # print name of main thread 
    print("Main thread name: {}".format(threading.main_thread().name)) 

    dense_mx2 = dense_mx
    dense_mx3 = dense_mx
    dense_mx4 = dense_mx

    qmdlvl = queue.Queue()
    qmdlch = queue.Queue()
    qmdltme = queue.Queue()
    qmdlhdrfname = queue.Queue()

    perplex = 200

    # creating threads 
    exag=10
    t1 = threading.Thread(target=task1,args=(dense_mx,qmdlvl,qmdlch,perplex,exag,qmdltme,qmdlhdrfname,"#perplex: 200 early exag: 10 timing:$_plex200_exag10.csv"), name='t1')                          

    exag=30
    t2 = threading.Thread(target=task2,args=(dense_mx2,qmdlvl,qmdlch,perplex,exag,qmdltme,qmdlhdrfname,"#perplex: 200 early exag: 30 timing:$_plex200_exag30.cv"), name='t2')   

    exag=50
    t3 = threading.Thread(target=task3,args=(dense_mx3,qmdlvl,qmdlch,perplex,exag,qmdltme,qmdlhdrfname,"#perplex: 200 early exag: 50 timing:$_plex200_exag50.csv"), name='t3')                          

    exag=100
    t4 = threading.Thread(target=task4,args=(dense_mx4,qmdlvl,qmdlch,perplex,exag,qmdltme,qmdlhdrfname,"#perplex: 200 early exag: 100 timing:$_plex200_exag100.cv"), name='t4')   

    # starting threads 
    t1.start() 
    t2.start() 
    t3.start() 
    t4.start() 



    # wait until all threads finish 
    t1.join() 
    t2.join() 
    t3.join() 
    t4.join() 

    while True:
        if qmdlvl.empty():
            print("Queue closed. Exiting thread.")   
            break
        try:
            item1 = qmdlvl.get(timeout=.5)
            item2 = qmdlch.get(timeout=.5)
            item3 = qmdltme.get(timeout=.5)
            header,fname = qmdlhdrfname.get(timeout=.5).split('$')
        except:
            continue        
        write_tsne_op(item1,fname,header)


Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...