Краткая программа Python для многоядерных CSV, требуется совет / помощь - PullRequest
0 голосов
/ 10 октября 2018

Я хобби-программист, начинающий с AHK, затем немного java, и теперь я пытаюсь изучать Python. Я искал и нашел несколько советов, но пока не смог внедрить их в свой собственный код. Надеюсь, кто-то здесь может мне помочь, это очень короткая программа.Я использую базу данных .txt csv с ";"в качестве разделителя.ПРИМЕР БАЗЫ ДАННЫХ:

Какого цвета обычно является кошкой?; Черный

Какого роста был самый длинный человек на земле?; 272 см

Земля круглая?; Да

База данных теперь состоит из 20 000 строк, что заставляет программу «замедляться», используя только 25% ЦП (1 ядро).

Если я смогу использовать все 4 ядра (100%)Я думаю, что это будет выполнять задачу намного быстрее.Задача состоит в том, чтобы сравнить CLIPBOARD с базой данных и, если есть совпадение, он должен дать мне ответ в качестве возврата.Возможно также, я могу разделить базу данных на 4 части?

Код сейчас выглядит так!Не более чем 65 строк и это делает свою работу (но медленно).Советы о том, как я могу превратить этот процесс в многоядерный.

    import time
    import pyperclip as pp
    import pandas as pd
    import pymsgbox as pmb
    from fuzzywuzzy import fuzz
    import numpy


    ratio_threshold = 90
    fall_back_time = 1
    db_file_path = 'database.txt'
    db_separator = ';'
    db_encoding = 'latin-1'

    def load_db():
        while True:
            try:
                # Read and create database
                db = pd.read_csv(db_file_path, sep=db_separator, encoding=db_encoding)
                db = db.drop_duplicates()
                return db
            except:
                print("Error in load_db(). Will sleep for %i seconds..." % fall_back_time)
        time.sleep(fall_back_time)


    def top_answers(db, question):
        db['ratio'] = db['question'].apply(lambda q: fuzz.ratio(q, question))
        db_sorted = db.sort_values(by='ratio', ascending=False)
        db_sorted = db_sorted[db_sorted['ratio'] >= ratio_threshold]
        return db_sorted


    def write_txt(top):
        result = top.apply(lambda row: "%s" % (row['answer']), axis=1).tolist()
        result = '\n'.join(result)
        fileHandle = open("svar.txt", "w")
        fileHandle.write(result)
        fileHandle.close()
        pp.copy("")


    def main():
        try:
            db = load_db()
            last_db_reload = time.time()

            while True:
                # Get contents of clipboard
                question = pp.paste()

                # Rank answer
                top = top_answers(db, question)

                # If answer was found, show results
                if len(top) > 0:
                    write_txt(top)
                time.sleep(fall_back_time)
        except:
            print("Error in main(). Will sleep for %i seconds..." % fall_back_time)
            time.sleep(fall_back_time)


   if name == 'main':
       main()'

Ответы [ 2 ]

0 голосов
/ 10 октября 2018

Решение с многопроцессорной обработкой:

import time
import pyperclip as pp
import pandas as pd
#import pymsgbox as pmb
from fuzzywuzzy import fuzz
import numpy as np

# pathos uses better pickle to tranfer more complicated objects
from pathos.multiprocessing import Pool
from functools import reduce

import sys
import os
from contextlib import closing

ratio_threshold = 70
fall_back_time = 1
db_file_path = 'database.txt'
db_separator = ';'
db_encoding = 'latin-1'

chunked_db = []
NUM_PROCESSES = os.cpu_count()

def load_db():
    while True:
        try:
            # Read and create database
            db = pd.read_csv(db_file_path, sep=db_separator, encoding=db_encoding)
            db.columns = ['question', 'answer']
            #db = db.drop_duplicates() # i drop it for experiment
            break
        except:
            print("Error in load_db(). Will sleep for %i seconds..." % fall_back_time)
    time.sleep(fall_back_time)
    # split database into equal chunks:
    # (if you have a lot of RAM, otherwise you 
    # need to compute ranges in db, something like
    # chunk_size = len(db)//NUM_PROCESSES
    # ranges[i] = (i*chunk_size, (i+1)*cjunk_size)
    # and pass ranges in original db to processes
    chunked_db = np.split(db, [NUM_PROCESSES], axis=0)
    return chunked_db




def top_answers_multiprocessed(question, chunked_db):

    # on unix, python uses 'fork' mode by default
    # so the process has 'copy-on-change' access to all global variables
    # i.e. if process will change something in db, it will be copied to it
    # with a lot of overhead
    # Unfortunately, I'fe heard that on Windows only 'spawn' mode with full 
    # copy of everything is used

    # Process pipeline uses pickle, it's quite slow.
    # so on small database you may not have benefit from multiprocessing
    # If you are going to transfer big objects in or out, look
    # in the direction of multiprocessing.Array

    # this solution is not fully efficient,
    # as pool is recreated each time

    # You can create daemon processes which will monitor
    # Queue for incoming questions, but it's harder to implement
    def top_answers(idx):
        # question is in the scope of parent function, 
        chunked_db[idx]['ratio'] = chunked_db[idx]['question'].apply(lambda q: fuzz.ratio(q, question))
        db_sorted = chunked_db[idx].sort_values(by='ratio', ascending=False)
        db_sorted = db_sorted[db_sorted['ratio'] >= ratio_threshold]
        return db_sorted



    with closing(Pool(processes=NUM_PROCESSES)) as pool:
        # chunked_db is a list of databases
        # they are in global scope, we send only index beacause
        # all the data set is pickled
        num_chunks = len(chunked_db)
        # apply function top_answers across generator range(num_chunks)
        res = pool.imap_unordered(top_answers, range(num_chunks))
        res = list(res) 
        # now res is list of dataframes, let's join it
        res_final = reduce(lambda left,right: pd.merge(left,right,on='ratio'), res)
    return res_final





def write_txt(top):
    result = top.apply(lambda row: "%s" % (row['answer']), axis=1).tolist()
    result = '\n'.join(result)
    fileHandle = open("svar.txt", "w")
    fileHandle.write(result)
    fileHandle.close()
    pp.copy("")


def mainfunc():
   global chunked_db
   chunked_db = load_db()
   last_db_reload = time.time()
   print('db loaded')

   last_clip = ""
   while True:
       # Get contents of clipboard
       try:
           new_clip = pp.paste()
       except:
           continue

       if (new_clip != last_clip) and (len(new_clip)> 0):
           print(new_clip)
           last_clip = new_clip

           question = new_clip.strip()
       else:
           continue


       # Rank answer
       top = top_answers_multiprocessed(question, chunked_db)

       # If answer was found, show results
       if len(top) > 0:
            #write_txt(top)
            print(top)


if __name__ == '__main__':
    mainfunc()
0 голосов
/ 10 октября 2018

Если бы вы могли разделить БД на четыре одинаково больших, вы могли бы обрабатывать их параллельно следующим образом:

import time
import pyperclip as pp
import pandas as pd
import pymsgbox as pmb
from fuzzywuzzy import fuzz
import numpy
import threading

ratio_threshold = 90
fall_back_time = 1
db_file_path = 'database.txt'
db_separator = ';'
db_encoding = 'latin-1'


def worker(thread_id, question):
    thread_id = str(thread_id)
    db = pd.read_csv(db_file_path + thread_id, sep=db_separator,    encoding=db_encoding)
    db = db.drop_duplicates()
    db['ratio'] = db['question'].apply(lambda q: fuzz.ratio(q, question))
    db_sorted = db.sort_values(by='ratio', ascending=False)
    db_sorted = db_sorted[db_sorted['ratio'] >= ratio_threshold]
    top = db_sorted
    result = top.apply(lambda row: "%s" % (row['answer']), axis=1).tolist()
    result = '\n'.join(result)
    fileHandle = open("svar" + thread_id + ".txt", "w")
    fileHandle.write(result)
    fileHandle.close()
    pp.copy("")
    return


def main():
    question = pp.paste()
    for i in range(1, 4):
        t = threading.Thread(target=worker, args=(i, question))
        t.start()
        t.join()


if name == 'main':
    main()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...