Где установить замки в пандах с многопоточностью? - PullRequest
0 голосов
/ 02 ноября 2018

Я пытаюсь асинхронно читать и писать из pandas df с помощью функции apply. Для этого я использую пакет multithreading.dummy. Поскольку я выполняю чтение и запись одновременно (многопоточно) на моем df, я использую multiprocessing.Lock(), чтобы не более одного thread могло редактировать df в данный момент времени. Однако я немного смущен тем, куда мне следует добавить lock.acquire() и lock.release() с функцией apply в pandas. Я попытался сделать, как описано ниже, однако, кажется, что при этом весь процесс становится synchronous, так что он наносит ущерб всей цели многопоточности.

self._lock.acquire()
to_df[col_name] = to_df.apply(lambda row: getattr(Object(row['col_1'], 
                                                           row['col_2'],
                                                           row['col_3']), 
                                                          someattribute), axis=1)
self._lock.release()

Примечание: в моем случае я должен делать getattr. someattribute - это просто @property в Object. Объект принимает 3 аргумента, некоторые из строк 1,2,3 из моего df.

1 Ответ

0 голосов
/ 02 ноября 2018

Есть 2 возможных решения. 1 - замки. 2 - очереди. Приведенный ниже код является просто скелетом, он может содержать опечатки / ошибки и не может быть использован как есть.

Во-первых. Замки там, где они действительно нужны:

def method_to_process_url(df):

    lock.acquire()
    url = df.loc[some_idx, some_col]
    lock.release()

    info = process_url(url)

    lock.acquire()
    # add info to df
    lock.release()

Во-вторых. Очереди вместо замков:

def method_to_process_url(df, url_queue, info_queue):
    for url in url_queue.get():
        info = process_url(url)
        info_queue.put(info)

url_queue = queue.Queue()
# add all urls to process to the url_queue

info_queue = queue.Queue()

# working_thread_1
threading.Thread(
    target=method_to_process_url,
    kwargs={'url_queue': url_queue, 'info_queue': info_queue},
    daemon=True).start()


# more working threads

counter = 0
while counter < amount_of_urls:
    info = info_queue.get():
    # add info to df
    counter += 1

Во втором случае вы можете даже запустить отдельный поток для каждого URL без url_queue (разумно, если количество URL-адресов порядка тысяч или меньше). counter - это простой способ остановить программу после обработки всех URL.

Я бы использовал второй подход, если вы спросите меня. На мой взгляд, он более гибкий.

...