Python многопроцессорный пул apply_async словарь как проблема аргумента - PullRequest
0 голосов
/ 23 мая 2019

Среда - это Python 3.7.2 в Windows.

Я пытаюсь проанализировать элементы из списка воспроизведения m3u и обработать их с помощью многопроцессорной обработки.

Я анализирую элементы, используя одну функцию, которая выдает их как dictПосле создания пула я использую args= в Pool.apply_async(), чтобы передать dict в качестве аргумента функции, которая его использует.

Проблема в том, что даже проанализированные элементы циклически перебираются один за другим без проблем, некоторые элементы, брошенные в пул, не были обработаны, а некоторые элементы были обработаны несколько раз.

Вот мой минимум неработающего кода.

import argparse
import codecs
import multiprocessing
import re
import chardet


# parse item from m3u, read file line by line and yield them
def parse_m3u(m3u_file):
    if m3u_file.lower().endswith("m3u8"):
        charset = "utf8"
    else:
        with open(m3u_file, mode='rb') as test:
            charset = chardet.detect(test.read(1024))["encoding"]  # decide the encoding of m3u file
    with codecs.open(m3u_file, 'r', encoding=charset) as m3u:
        if not m3u.readline().startswith("#EXTM3U"):  # not start with magic word, not a valid file
            return
        expect_path = False

        entry = {'length': -1,
                 'name': "",
                 'group-title': "",
                 'tvg-name': "",
                 'tvg-language': "",
                 'tvg-id': "",
                 'tvg-logo': "",
                 'tvg-country': "",
                 'path': ""}

        for line in m3u:
            if line.startswith('#EXTINF:'):  # meta line
                expect_path = True
                length = re.search(r'#EXTINF:(-*\d+)\s', line)
                name = re.search(r'#EXTINF:.*?,(.*)', line)
                group_title = re.search(r'group-title=\"(\S*)\"', line)
                tvg_name = re.search(r'tvg-name=\"(\S*)\"', line)
                tvg_language = re.search(r'tvg-language=\"(\S*)\"', line)
                tvg_id = re.search(r'tvg-id=\"(\S*)\"', line)
                tvg_logo = re.search(r'tvg-logo=\"(\S*)\"', line)
                tvg_country = re.search(r'tvg-country=\"(\S*)\"', line)
                entry["length"] = -1 if length is None else int(length[1])
                entry["name"] = "" if name is None else name[1].strip()
                entry["group-id"] = "" if group_title is None else group_title[1].strip()
                entry["tvg-name"] = "" if tvg_name is None else tvg_name[1].strip()
                entry["tvg-language"] = "" if tvg_language is None else tvg_language[1].strip()
                entry["tvg-id"] = "" if tvg_id is None else tvg_id[1].strip()
                entry["tvg-logo"] = "" if tvg_logo is None else tvg_logo[1].strip()
                entry["tvg-country"] = "" if tvg_country is None else tvg_country[1].strip()
            elif line.startswith("#"):  # comment line
                pass
            elif not line.isspace():  # file path or url
                if expect_path:
                    expect_path = False
                    entry["path"] = line.strip()
                    yield entry
                else:  # standalone without m3u ext
                    yield {'length': -1, 'name': "", 'group-title': "", 'tvg-name': "", 'tvg-language': "",
                           'tvg-id': "", 'tvg-logo': "", 'tvg-country': "", "path": line.strip()}


def main():
    pool = multiprocessing.Pool()
    for item in parse_m3u(args.M3U):
        print("pass: " + str(item))    # everything is fine here
        pool.apply_async(print, args=(item,))    # do something using other function
    pool.close()
    pool.join()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description="Foo",
        epilog="Bar",
        fromfile_prefix_chars='@')
    parser.add_argument(
        "M3U",
        type=str,
        help="the M3U file",
        metavar="m3u_file")
    args = parser.parse_args()
    main()

И тестовый список воспроизведения

#EXTM3U

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="abc", 测试ITEM_0
https://test-dummy.com/path/to/index0.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="def", 测试ITEM_1
https://test-dummy.com/path/to/index1.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="abc", 测试ITEM_2
https://test-dummy.com/path/to/index2.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="hij", 测试ITEM_3
https://test-dummy.com/path/to/index3.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="abc", 测试ITEM_4
https://test-dummy.com/path/to/index4.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="abc", 测试ITEM_5
https://test-dummy.com/path/to/index5.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="mno", 测试ITEM_6
https://test-dummy.com/path/to/index6.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="abc", 测试ITEM_7
https://test-dummy.com/path/to/index7.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="qrs", 测试ITEM_8
https://test-dummy.com/path/to/index8.m3u8

#EXTINF:-1 tvg-logo="http://example.com/img.jpg" group-title="abc", 测试ITEM_9
https://test-dummy.com/path/to/index9.m3u8


# stand alone entry

https://test-dummy.com/path/to/blahblah.m3u8
https://test-dummy.com/path/to/foofoofoofoofoo
https://test-dummy.com/path/to/audio_file.mp3

ftp://test-dummy.com:2121/path/to/video_file.mp4


Один типичный вывод моей программы выглядит ниже.Как видите, есть несколько дубликатов, хотя некоторые элементы вообще не обрабатываются.Выходные данные подпроцесса меняются каждый раз, но они не верны.

"C:\Program Files\Python37\python.exe" "D:/Project/m3u/tmp.py" test.m3u8
pass: {'length': -1, 'name': '测试ITEM_0', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index0.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_1', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index1.m3u8', 'group-id': 'def'}
pass: {'length': -1, 'name': '测试ITEM_2', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index2.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_3', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index3.m3u8', 'group-id': 'hij'}
pass: {'length': -1, 'name': '测试ITEM_4', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index4.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_5', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index5.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_6', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index6.m3u8', 'group-id': 'mno'}
pass: {'length': -1, 'name': '测试ITEM_7', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index7.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_8', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index8.m3u8', 'group-id': 'qrs'}
pass: {'length': -1, 'name': '测试ITEM_9', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index9.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/blahblah.m3u8'}
pass: {'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/foofoofoofoofoo'}
pass: {'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/audio_file.mp3'}
pass: {'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'ftp://test-dummy.com:2121/path/to/video_file.mp4'}
{'length': -1, 'name': '测试ITEM_4', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index4.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_5', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index5.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_5', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index5.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_5', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index5.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_6', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index6.m3u8', 'group-id': 'mno'}
{'length': -1, 'name': '测试ITEM_6', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index6.m3u8', 'group-id': 'mno'}
{'length': -1, 'name': '测试ITEM_7', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index7.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_9', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index9.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_9', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index9.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_9', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index9.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/blahblah.m3u8'}
{'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/foofoofoofoofoo'}
{'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/audio_file.mp3'}
{'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'ftp://test-dummy.com:2121/path/to/video_file.mp4'}

Process finished with exit code 0

Я пытался передать что-то, а не диктовку:

pool.apply_async(print, args=(item["path"],))

pool.apply_async(print, args=(str(item),))

Каждый элемент был обработан без дубликатов, как показано в выводе ниже (передача str(item) в print).

"C:\Program Files\Python37\python.exe" "D:/Project/m3u/tmp.py" test.m3u8
pass: {'length': -1, 'name': '测试ITEM_0', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index0.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_1', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index1.m3u8', 'group-id': 'def'}
pass: {'length': -1, 'name': '测试ITEM_2', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index2.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_3', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index3.m3u8', 'group-id': 'hij'}
pass: {'length': -1, 'name': '测试ITEM_4', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index4.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_5', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index5.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_6', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index6.m3u8', 'group-id': 'mno'}
pass: {'length': -1, 'name': '测试ITEM_7', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index7.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '测试ITEM_8', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index8.m3u8', 'group-id': 'qrs'}
pass: {'length': -1, 'name': '测试ITEM_9', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index9.m3u8', 'group-id': 'abc'}
pass: {'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/blahblah.m3u8'}
pass: {'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/foofoofoofoofoo'}
pass: {'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/audio_file.mp3'}
pass: {'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'ftp://test-dummy.com:2121/path/to/video_file.mp4'}
{'length': -1, 'name': '测试ITEM_0', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index0.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_1', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index1.m3u8', 'group-id': 'def'}
{'length': -1, 'name': '测试ITEM_2', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index2.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_3', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index3.m3u8', 'group-id': 'hij'}
{'length': -1, 'name': '测试ITEM_4', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index4.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_5', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index5.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_6', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index6.m3u8', 'group-id': 'mno'}
{'length': -1, 'name': '测试ITEM_7', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index7.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '测试ITEM_8', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index8.m3u8', 'group-id': 'qrs'}
{'length': -1, 'name': '测试ITEM_9', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': 'http://example.com/img.jpg', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/index9.m3u8', 'group-id': 'abc'}
{'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/blahblah.m3u8'}
{'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/foofoofoofoofoo'}
{'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'https://test-dummy.com/path/to/audio_file.mp3'}
{'length': -1, 'name': '', 'group-title': '', 'tvg-name': '', 'tvg-language': '', 'tvg-id': '', 'tvg-logo': '', 'tvg-country': '', 'path': 'ftp://test-dummy.com:2121/path/to/video_file.mp4'}

Process finished with exit code 0

Я также создал аналогичную программу для тестирования, и она работает.

import multiprocessing
import random


def gen_data():
    for i in range(0, 20):
        entry = {"id": str(i),
                 "b": random.randint(0, 1000000)}
        yield entry


def main():
    pool = multiprocessing.Pool(4)
    for item in gen_data():
        print("pass: " + item["id"])
        pool.apply_async(print, args=(item,))
    pool.close()
    pool.join()


if __name__ == '__main__':
    main()

Я действительно смущен и хочу знать, что я сделал не так.

Спасибо!

...