Python: конфликт функций с использованием многопроцессорного пакета - PullRequest
0 голосов
/ 07 марта 2020

У меня есть функция, выполняемая параллельно с использованием многопроцессорного пакета, а другая выполняется в основном процессе. Параллельная функция выполняется до тех пор, пока не закончится основная. Они оба работают, но не вместе. Обе функции имеют только один общий аргумент (глобальная переменная), и обе используют пакет BeautifulSoup. Кроме того, функции имеют внутренние локальные переменные с одинаковыми именами, но я не думаю, что это проблема. Объявление функций:

    stop_event, output = mp.Event(), mp.Queue()
    count_price_change = mp.Process(target=count, args=(stop_event, output, data_2, header_2, driver, lancer))
    count_price_change.start()
    start = time.time()

    collected = create_array_features(data_1, header_1, prices, change, changepct, volume, lancer)
    #PROBLEM IS HERE, eventually with lancer ?? 
    stop = time.time()
    time.sleep(frequency_get - (stop - start))
    stop_event.set()
    counts_changes = output.get()
    count_price_change.join()
    count_price_change.terminate()

Точная ошибка, которую я получаю:

    Traceback (most recent call last):
  File "/usr/lib/python3.6/code.py", line 91, in runcode
    exec(code, self.locals)
  File "<input>", line 6, in <module>
  File "<input>", line 80, in create_array_features
  File "/usr/local/lib/python3.6/dist-packages/bs4/__init__.py", line 300, in __init__
    markup, from_encoding, exclude_encodings=exclude_encodings)):
  File "/usr/local/lib/python3.6/dist-packages/bs4/builder/_htmlparser.py", line 240, in prepare_markup
    exclude_encodings=exclude_encodings)
  File "/usr/local/lib/python3.6/dist-packages/bs4/dammit.py", line 374, in __init__
    for encoding in self.detector.encodings:
  File "/usr/local/lib/python3.6/dist-packages/bs4/dammit.py", line 265, in encodings
    self.markup, self.is_html)
  File "/usr/local/lib/python3.6/dist-packages/bs4/dammit.py", line 323, in find_declared_encoding
    declared_encoding_match = xml_encoding_re.search(markup, endpos=xml_endpos)
TypeError: expected string or bytes-like object

Я пробовал минимально воспроизводимые примеры, но все они работают. Я потратил несколько часов на эту проблему и до сих пор не могу понять, почему я получаю ошибку. Любая помощь будет принята с благодарностью. Весь код (начать снизу):

def parse_html(names, driver):

    # need to make it wait until elements appear otherwise doesn't catch anything
    list_names = []
    for i in range(len(names)):
        html_name = names[i].get_attribute('innerHTML')
        soup = BeautifulSoup(html_name, "html.parser")
        match = soup.find('a').text
        list_names.append(match)

    lancer = datetime.datetime.now().replace(microsecond=0).isoformat(' ')
    prices = driver.find_elements_by_css_selector('div.item-field.item-price')
    change = driver.find_elements_by_css_selector('div.item-field.item-change.pn--color')
    changepct = driver.find_elements_by_css_selector('div.item-field.item-changepct.pn--color')
    volume = driver.find_elements_by_css_selector('div.item-field.item-volume.is-made')

    return list_names, lancer, prices, change, changepct, volume

def create_array_features(data, header, prices, change, changepct, volume, lancer):

    # create an array of features
    try:
        list_to_parse = data.columns.get_level_values(0).unique()
    except:
        list_to_parse = data.get_level_values(0).unique() #this is for when the dataframe is empty

    features = []
    for i in range(len(list_to_parse)):
        # prices
        html = prices[i].get_attribute('innerHTML')
        soup = BeautifulSoup(html, "html.parser")
        soup = [text for text in soup.find_all(text=True) if text.parent.name != "label"][1]
        match = prq.check_unit(soup)
        features.append(match)
        #parse some other stuff...

    collected = pd.DataFrame([features], columns=header)
    collected['time'] = pd.to_datetime(lancer)
    collected = collected.set_index('time')

    return collected

def count(stop_event, output, data_2, header_2, driver, lancer):

    try:
        list_to_parse = data_2.columns.get_level_values(0).unique()
    except:
        list_to_parse = data_2.get_level_values(0).unique() #this is for when the dataframe is empty

    change_count_prices = [0 for i in range(len(list_to_parse))]
    change_count_volume = [0 for i in range(len(list_to_parse))]
    last_prices = [None for i in range(len(list_to_parse))]
    last_volumes = [None for i in range(len(list_to_parse))]

    while not stop_event.is_set():

        WebDriverWait(driver, 60).until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, 'section.watchlist__container.expanded')))
        names_ = driver.find_elements_by_css_selector('div.item-symbol-inner')

        _, _, prices, _, _, volume = parse_html(names_, driver) #each time must update the price in order to check if has changed

        for i in range(len(list_to_parse)):
            html = prices[i].get_attribute('innerHTML')
            soup = BeautifulSoup(html, "html.parser")
            soup = [text for text in soup.find_all(text=True) if text.parent.name != "label"][1]
            match = prq.check_unit(soup)
            if match != last_prices[i]:
                change_count_prices[i] =+ 1
            last_prices[i] = match

            #do some other stuff

    change_count_prices.extend(change_count_volume)
    print("exited and will stop")
    collected = pd.DataFrame([change_count_prices], columns=header_2)
    collected['time'] = pd.to_datetime(lancer)
    collected = collected.set_index('time')
    output.put(collected) #LIFO queue with ouput that comes at the end when event is set

def collect(data_1, data_2, header_1, header_2, frequency_get, driver,
            lancer, prices, change, changepct, volume):

    stop_event, output = mp.Event(), mp.Queue()
    count_price_change = mp.Process(target=count, args=(stop_event, output, data_2, header_2, driver, lancer))
    count_price_change.start()
    start = time.time()

    collected = create_array_features(data_1, header_1, prices, change, changepct, volume, lancer)
    stop = time.time()
    time.sleep(frequency_get - (stop - start))
    stop_event.set()
    counts_changes = output.get()
    count_price_change.join()
    count_price_change.terminate()

Ответы [ 2 ]

0 голосов
/ 13 марта 2020

В конце концов я смог найти ответ. Проблема заключается в том, что Selenium не может обрабатывать несколько вкладок параллельно, имея один и тот же драйвер. Чтобы использовать Selenium в многопоточности, вы должны использовать один отдельный драйвер для каждого потока. Который не был я делал выше

0 голосов

Поскольку я не могу воссоздать вашу проблему, обычно я предлагаю не использовать общие переменные между процессами, за исключением того, что вы каждый раз знаете состояние каждого. Также вам всегда нужно иметь 2 функции или сценарии в одном каталоге, потому что многие Проблемы с ответвлениями могут отображаться как ошибки.

Таким образом, в общем случае такие проблемы можно решить, обмениваясь данными между основными и дочерними процессами, например, с помощью multyprocessing Pipe (), и в режиме реального времени сообщать о состоянии переменной:

parent_conn, child_conn = Pipe()
p = Process(target = plot_map_function,args =(self.variable1,self.variable2,child_conn))
p.daemon=True
p.start()
parent_conn.recv()
p.join()

в другой функции:

def plot_map_function(var1,var2,conn_to_parent):
    #do something with variables 
    conn_to_parent.send(variable_i_want_to_send)
    conn_to_parent.close()

Надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...