Это, пожалуй, наиболее часто встречающаяся проблема для новичков в Scrapy или асинхронном программировании в целом.
(Поэтому я постараюсь получить более полный ответ.)
То, что вы пытаетесь сделать, это:
Response -> Response -> Response
| <-----------------------'
| \-> Response
| <-----------------------'
| \-> Response
| <-----------------------'
aggregating \-> Response
V
Data out
Когда то, что вам действительно нужно делать в асинхронном программировании, это цепочка ваших ответов / обратных вызовов:
Response -> Response -> Response -> Response ::> Data out to ItemPipeline (Exporters)
\-> Response -> Response -> Response ::> Data out to ItemPipeline
\-> Response -> Response ::> Data out to ItemPipeline
\> Response ::> Error
Итак, нам нужен сдвиг парадигмы в мыслях о том, как агрегировать ваши данные.
Думайте о потоке кода как о временной шкале; Вы не можете вернуться назад во времени - или вернуть результат назад во времени - только вперед.
Вы можете получить обещание выполнения какой-либо будущей работы в то время, когда вы ее запланировали.
Таким образом, самый разумный способ - это переслать себе данные, которые вам понадобятся в тот момент в будущем.
Основная проблема, я думаю, заключается в том, что это выглядит и выглядит неловко в Python, тогда как выглядит
гораздо более естественный в таких языках, как JavaScript, хотя по сути он такой же.
(Я могу сказать это, потому что я много и подробно программировал JavaScript в течение ~ 5 лет и понимал его модель событий,
до прихода в Python и Scrapy, и у меня все еще было много времени, пытаясь приспособиться к стилю и ощущениям Twisted. И я никогда не буду естественным с этим.)
И это может быть даже больше в случае с Scrapy, потому что он пытается скрыть эту сложность Twisted deferred
s от пользователей.
Но вы должны увидеть некоторые сходства в следующих представлениях:
Случайный пример JS:
new Promise(function(resolve, reject) { // code flow
setTimeout(() => resolve(1), 1000); // |
}).then(function(result) { // v
alert(result); // |
return result * 2; // |
}).then(function(result) { // |
alert(result); // |
return result * 2; // v
});
Стиль Twisted deferred's:

(Изображение из https://twistedmatrix.com/documents/16.2.0/core/howto/defer.html#visual-explanation)
Стиль в обратных вызовах Scrapy Spider:
scrapy.Request(url,
callback=self.parse, # > go to next response callback
errback=self.erred) # > go to custom error callback
Так что же нас оставит с помощью Scrapy?
Передавайте свои данные по ходу, не копите их;)
Этого должно быть достаточно почти во всех случаях, кроме случаев, когда у вас нет выбора, кроме как объединить информацию об элементе с нескольких страниц, но когда эти запросы не могут быть сериализованы в следующую схему (подробнее об этом позже).
->- flow of data ---->---------------------->
Response -> Response
`-> Data -> Req/Response
Data `-> MoreData -> Yield Item to ItemPipeline (Exporters)
Data -> Req/Response
`-> MoreData -> Yield Item to ItemPipeline
1. Gen 2. Gen 3. Gen
Способ реализации этой модели в коде будет зависеть от вашего варианта использования.
Scrapy предоставляет поле meta
в запросах / ответах для обработки данных.
Несмотря на название, это на самом деле не «мета», а довольно существенный. Не избегай этого, привыкай.
Выполнение этого может показаться нелогичным, копирование и копирование всех этих данных в потенциально тысячи вновь порожденных запросов;
но из-за того, как Scrapy обрабатывает ссылки, это на самом деле неплохо, и старые объекты очищаются Scrapy рано.
В вышеприведенном искусстве ASCII к тому времени, когда все ваши запросы 2-го поколения будут поставлены в очередь, ответы 1-го поколения будут освобождены из памяти Scrapy и так далее.
Так что, на самом деле, это не слишком много памяти, если использовать ее правильно (и не обрабатывать много больших файлов).
Другая возможность «мета» - это переменные экземпляра (глобальные данные), чтобы хранить вещи в некоторых self.data
объект или другой, и доступ к нему в будущем из вашего следующего ответного обратного вызова.
(Никогда в старом, так как в то время еще не существовало.)
При этом всегда помните, что это глобальные общие данные; которые могут иметь "параллельные" обратные вызовы, смотрящие на это.
И, наконец, иногда можно даже использовать внешние источники, такие как Redis-Queues или сокеты, для обмена данными между Spider и хранилищем данных (например, для предварительного заполнения start_urls).
А как это может выглядеть в коде?
Вы можете написать «рекурсивные» методы разбора (на самом деле просто направить все ответы одним и тем же методом обратного вызова):
def parse(self, response):
if response.xpath('//li[@class="next"]/a/@href').extract_first():
yield scrapy.Request(response.urljoin(next_page_url)) # will "recurse" back to parse()
if 'some_data' in reponse.body:
yield { # the simplest item is a dict
'statuscode': response.body.status,
'data': response.body,
}
или вы можете разделить несколько parse
методов, каждый из которых обрабатывает определенный тип страницы / Ответ:
def parse(self, response):
if response.xpath('//li[@class="next"]/a/@href').extract_first():
request = scrapy.Request(response.urljoin(next_page_url))
request.callback = self.parse2 # will go to parse2()
request.meta['data'] = 'whatever'
yield request
def parse2(self, response):
data = response.meta.get('data')
# add some more data
data['more_data'] = response.xpath('//whatever/we/@found').extract()
# yield some more requests
for url in data['found_links']:
request = scrapy.Request(url, callback=self.parse3)
request.meta['data'] = data # and keep on passing it along
yield request
def parse3(self, response):
data = response.meta.get('data')
# ...workworkwork...
# finally, drop stuff to the item-pipelines
yield data
Или даже объединить это так:
def parse(self, response):
data = response.meta.get('data', None)
if not data: # we are on our first request
if response.xpath('//li[@class="next"]/a/@href').extract_first():
request = scrapy.Request(response.urljoin(next_page_url))
request.callback = self.parse # will "recurse" back to parse()
request.meta['data'] = 'whatever'
yield request
return # stop here
# else: we already got data, continue with something else
for url in data['found_links']:
request = scrapy.Request(url, callback=self.parse3)
request.meta['data'] = data # and keep on passing it along
yield request
Но этого ДЕЙСТВИТЕЛЬНО недостаточно для моего случая!
Наконец, можно рассмотреть эти более сложные подходы для обработки управления потоком , поэтому эти надоедливые асинхронные вызовы становятся предсказуемыми:
Принудительная сериализация взаимозависимых запросов путем изменения потока запросов:
def start_requests(self):
url = 'https://example.com/final'
request = scrapy.Request(url, callback=self.parse1)
request.meta['urls'] = [
'https://example.com/page1',
'https://example.com/page2',
'https://example.com/page3',
]
yield request
def parse1(self, response):
urls = response.meta.get('urls')
data = response.meta.get('data')
if not data:
data = {}
# process page response somehow
page = response.xpath('//body').extract()
# and remember it
data[response.url] = page
# keep unrolling urls
try:
url = urls.pop()
request = Request(url, callback=self.parse1) # recurse
request.meta['urls'] = urls # pass along
request.meta['data'] = data # to next stage
return request
except IndexError: # list is empty
# aggregate data somehow
item = {}
for url, stuff in data.items():
item[url] = stuff
return item
Другим вариантом для этого являются scrapy-inline-requests
, но также следует учитывать и недостатки (читайте проект README).
@inline_requests
def parse(self, response):
urls = [response.url]
for i in range(10):
next_url = response.urljoin('?page=%d' % i)
try:
next_resp = yield Request(next_url, meta={'handle_httpstatus_all': True})
urls.append(next_resp.url)
except Exception:
self.logger.info("Failed request %s", i, exc_info=True)
yield {'urls': urls}
Агрегирование данных в хранилище экземпляров («глобальные данные») и управление потоком с помощью одного или обоих
- Планировщик запрашивает приоритеты для обеспечения порядка или ответов, поэтому мы
можно надеяться, что к тому времени, когда последний запрос будет обработан, все нижние прио закончили.
- Custom
pydispatch
сигналы для внеполосных сигналов
уведомления. Хотя они не очень легкие, они представляют собой совершенно другой слой
для обработки событий и уведомлений.
Это простой способ использования пользовательских Запрос приоритетов :
custom_settings = {
'CONCURRENT_REQUESTS': 1,
}
data = {}
def parse1(self, response):
# prioritize these next requests over everything else
urls = response.xpath('//a/@href').extract()
for url in urls:
yield scrapy.Request(url,
priority=900,
callback=self.parse2,
meta={})
final_url = 'https://final'
yield scrapy.Request(final_url, callback=self.parse3)
def parse2(self, response):
# handle prioritized requests
data = response.xpath('//what/we[/need]/text()').extract()
self.data.update({response.url: data})
def parse3(self, response):
# collect data, other requests will have finished by now
# IF THE CONCURRENCY IS LIMITED, otherwise no guarantee
return self.data
И базовый пример использования сигналов.
При этом прослушивается внутреннее событие idle
, когда Spider сканирует все запросы и работает довольно красиво, чтобы использовать его для очистки в последнюю секунду (в данном случае, для агрегирования наших данных). Мы можем быть абсолютно уверены, что на данном этапе мы не пропустим никаких данных.
from scrapy import signals
class SignalsSpider(Spider):
data = {}
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super(Spider, cls).from_crawler(crawler, *args, **kwargs)
crawler.signals.connect(spider.idle, signal=signals.spider_idle)
return spider
def idle(self, spider):
if self.ima_done_now:
return
self.crawler.engine.schedule(self.finalize_crawl(), spider)
raise DontCloseSpider
def finalize_crawl(self):
self.ima_done_now = True
# aggregate data and finish
item = self.data
return item
def parse(self, response):
if response.xpath('//li[@class="next"]/a/@href').extract_first():
yield scrapy.Request(response.urljoin(next_page_url), callback=self.parse2)
def parse2(self, response):
# handle requests
data = response.xpath('//what/we[/need]/text()').extract()
self.data.update({response.url: data})
Последняя возможность - использовать внешние источники, такие как очереди сообщений или redis, как уже упоминалось, для управления потоком паука извне.
И это охватывает все способы, которые я могу придумать.
После того, как Предмет получен / возвращен в Двигатель, он будет передан ItemPipeline
с (который может использовать Exporters
- не путать с FeedExporters
),
где вы можете продолжать массировать данные за пределами паука.
Пользовательская реализация ItemPipeline
может хранить элементы в базе данных или выполнять любые экзотические операции с ними.
Надеюсь, это поможет.