Как мне параллельно загрузить большой список URL-адресов в pyspark? - PullRequest
1 голос
/ 28 сентября 2019

У меня есть СДР, содержащий 10000 URL-адресов для извлечения.

list = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
        'http://google.com',
        'http://twitter.com']
urls = sc.parallelize(list)

Мне нужно проверить, какие URL-адреса нарушены, и предпочтительно получить результаты в соответствующий СДР в Python.Я пробовал это:

import asyncio
import concurrent.futures
import requests

async def get(url):

    with concurrent.futures.ThreadPoolExecutor() as executor:

        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                i
            )
            for i in url
        ]
        return futures

async def get_response(futures):
    response = await asyncio.gather(futures,return_exceptions=True)
    return(response)

tasks = urls.map(lambda query: get(query)) # Method returns http call response as a Future[String]

results = tasks.map(lambda task: get_response(task) )
results = results.map(lambda response:'ERR' if isinstance(response, Exception) else 'OK' )
results.collect()

Я получаю следующий вывод, который, очевидно, не так:

['OK', 'OK', 'OK']

Я также пробовал это:

import asyncio
import concurrent.futures
import requests

async def get():

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                i
            )
            for i in urls.toLocalIterator()
        ]
        for response in await asyncio.gather(*futures,return_exceptions=True):
            print('{}: {}'.format(response, 'ERR' if isinstance(response, Exception) else 'OK'))
            pass


loop = asyncio.get_event_loop()
loop.run_until_complete(get())

Я получаюследующий вывод:

HTTPConnectionPool(host='SDFKHSKHGKLHSKLJHGSDFKSJH.com', port=80): Max retries exceeded with url: / (Caused by 
NewConnectionError('<urllib3.connection.HTTPConnection object at 0x12c834210>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known')): ERR
<Response [200]>: OK
<Response [200]>: OK

Желаемый вывод будет выглядеть примерно так:

http://SDFKHSKHGKLHSKLJHGSDFKSJH.com : ERR
http://google.com : OK
http://twitter.com : OK

Но проблема второго подхода заключается в использовании списков для хранения будущих объектов.Я считаю, что лучше использовать RDD, поскольку количество URL может быть в миллионах или миллиардах, и ни одна отдельная машина не может справиться с этим.Также мне не понятно, как извлечь URL из ответов.

1 Ответ

1 голос
/ 28 сентября 2019

Если вы используете concurrent.futures, вам вообще не понадобится asyncio (это не даст вам никаких преимуществ, так как вы все равно работаете в нескольких потоках).Вы можете использовать concurrent.futures.wait() для параллельного ожидания нескольких фьючерсов.

Я не могу проверить ваши данные, но он должен работать с таким кодом:

import concurrent.futures, requests

def get_one(url):
    resp = requests.get(url)
    resp.raise_for_status()
    return resp.text

def get_all():
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = [executor.submit(get_one, url)
                   for url in urls.toLocalIterator()]
    # the end of the "with" block will automatically wait
    # for all of the executor's tasks to complete

    for fut in futures:
        if fut.exception() is not None:
            print('{}: {}'.format(fut.exception(), 'ERR')
        else:
            print('{}: {}'.format(fut.result(), 'OK')

Чтобы сделать то же самоечто-то с asyncio, вы должны использовать aiohttp .

...