У меня есть СДР, содержащий 10000 URL-адресов для извлечения.
list = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
'http://google.com',
'http://twitter.com']
urls = sc.parallelize(list)
Мне нужно проверить, какие URL-адреса нарушены, и предпочтительно получить результаты в соответствующий СДР в Python.Я пробовал это:
import asyncio
import concurrent.futures
import requests
async def get(url):
with concurrent.futures.ThreadPoolExecutor() as executor:
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(
executor,
requests.get,
i
)
for i in url
]
return futures
async def get_response(futures):
response = await asyncio.gather(futures,return_exceptions=True)
return(response)
tasks = urls.map(lambda query: get(query)) # Method returns http call response as a Future[String]
results = tasks.map(lambda task: get_response(task) )
results = results.map(lambda response:'ERR' if isinstance(response, Exception) else 'OK' )
results.collect()
Я получаю следующий вывод, который, очевидно, не так:
['OK', 'OK', 'OK']
Я также пробовал это:
import asyncio
import concurrent.futures
import requests
async def get():
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(
executor,
requests.get,
i
)
for i in urls.toLocalIterator()
]
for response in await asyncio.gather(*futures,return_exceptions=True):
print('{}: {}'.format(response, 'ERR' if isinstance(response, Exception) else 'OK'))
pass
loop = asyncio.get_event_loop()
loop.run_until_complete(get())
Я получаюследующий вывод:
HTTPConnectionPool(host='SDFKHSKHGKLHSKLJHGSDFKSJH.com', port=80): Max retries exceeded with url: / (Caused by
NewConnectionError('<urllib3.connection.HTTPConnection object at 0x12c834210>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known')): ERR
<Response [200]>: OK
<Response [200]>: OK
Желаемый вывод будет выглядеть примерно так:
http://SDFKHSKHGKLHSKLJHGSDFKSJH.com : ERR
http://google.com : OK
http://twitter.com : OK
Но проблема второго подхода заключается в использовании списков для хранения будущих объектов.Я считаю, что лучше использовать RDD, поскольку количество URL может быть в миллионах или миллиардах, и ни одна отдельная машина не может справиться с этим.Также мне не понятно, как извлечь URL из ответов.