Scrapy Custom экспортер - PullRequest
       9

Scrapy Custom экспортер

7 голосов
/ 18 января 2012

Я определяю экспортер элементов, который помещает элементы в очередь сообщений.Ниже приведен код.

from scrapy.contrib.exporter import JsonLinesItemExporter
from scrapy.utils.serialize import ScrapyJSONEncoder
from scrapy import log

from scrapy.conf import settings

from carrot.connection import BrokerConnection, Exchange
from carrot.messaging import Publisher

log.start()


class QueueItemExporter(JsonLinesItemExporter):

    def __init__(self, **kwargs):

        log.msg("Initialising queue exporter", level=log.DEBUG)

        self._configure(kwargs)

        host_name = settings.get('BROKER_HOST', 'localhost')
        port = settings.get('BROKER_PORT', 5672)
        userid = settings.get('BROKER_USERID', "guest")
        password = settings.get('BROKER_PASSWORD', "guest")
        virtual_host = settings.get('BROKER_VIRTUAL_HOST', "/")

        self.encoder = settings.get('MESSAGE_Q_SERIALIZER', ScrapyJSONEncoder)(**kwargs)

        log.msg("Connecting to broker", level=log.DEBUG)
        self.q_connection = BrokerConnection(hostname=host_name, port=port,
                        userid=userid, password=password,
                        virtual_host=virtual_host)
        self.exchange = Exchange("scrapers", type="topic")
        log.msg("Connected", level=log.DEBUG)

    def start_exporting(self):
        spider_name = "test"
        log.msg("Initialising publisher", level=log.DEBUG)
        self.publisher = Publisher(connection=self.q_connection,
                        exchange=self.exchange, routing_key="scrapy.spider.%s" % spider_name)
        log.msg("done", level=log.DEBUG)

    def finish_exporting(self):
        self.publisher.close()

    def export_item(self, item):
        log.msg("In export item", level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        self.publisher.send({"scraped_data": self.encoder.encode(itemdict)})
        log.msg("sent to queue - scrapy.spider.naukri", level=log.DEBUG)

У меня есть несколько проблем.Элементы не отправляются в очередь.Я добавил следующее в мои настройки:

FEED_EXPORTERS = {
    "queue": 'scrapers.exporters.QueueItemExporter'
}

FEED_FORMAT = "queue"

LOG_STDOUT = True

Код не вызывает никаких ошибок, и при этом я не вижу ни одного из сообщений регистрации.Я нахожусь в моем уме о том, как отладить это.

Любая помощь будет высоко ценится.

Ответы [ 3 ]

5 голосов
/ 26 февраля 2012

«Экспортеры каналов» - это быстрые (и несколько грязные) ярлыки для вызова некоторых «стандартных» экспортеров элементов. Вместо того, чтобы настраивать экспортер каналов из настроек, жестко подключите ваш собственный экспортер элементов к вашему пользовательскому конвейеру, как описано здесь http://doc.scrapy.org/en/0.14/topics/exporters.html#using-item-exporters:

from scrapy.xlib.pydispatch import dispatcher
from scrapy import signals
from scrapy.contrib.exporter import XmlItemExporter

class MyPipeline(object):

    def __init__(self):
        ...
        dispatcher.connect(self.spider_opened, signals.spider_opened)
        dispatcher.connect(self.spider_closed, signals.spider_closed)
        ...

    def spider_opened(self, spider):
        self.exporter = QueueItemExporter()
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        # YOUR STUFF HERE
        ...
        self.exporter.export_item(item)
        return item
1 голос
/ 01 февраля 2017

Совет : Хороший пример для начала - Запись элементов в MongoDb из официальных документов.

Я сделал похожую вещь. Я создал конвейер, который помещает каждый элемент в S3-подобный сервис (я использую Minio здесь, но вы поняли идею). Он создает новое ведро для каждого паука и помещает каждый предмет в объект со случайным именем. Полный исходный код можно найти в моем репо .

Начиная с простых цитат паука из учебника:

import scrapy

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = ['http://quotes.toscrape.com/page/1/',
                    'http://quotes.toscrape.com/page/1/']

    def parse(self, response):
        for quote in response.css('div.quote'):
            yield {
                    'text':quote.css('span.text::text').extract_first(),
                    'author':quote.css('span small::text').extract_first(),
                    'tags':quote.css('div.tags a.tag::text').extract()
                    }
        next_page = response.css('li.next a::attr(href)').extract_first()
        if next_page is not None:
            next_page = response.urljoin(next_page)
            yield scrapy.Request(next_page, callback=self.parse)

В settings.py:

ITEM_PIPELINES = {
    'scrapy_quotes.pipelines.ScrapyQuotesPipeline': 300,
}

В scrapy_quotes/pipelines.py создать конвейер и экспортер предметов:

import uuid
from StringIO import StringIO
from scrapy.contrib.exporter import BaseItemExporter
from scrapy.conf import settings
from scrapy import signals
from scrapy.xlib.pydispatch import dispatcher
from scrapy import log
from scrapy.utils.python import to_bytes
from scrapy.utils.serialize import ScrapyJSONEncoder

class S3ItemExporter(BaseItemExporter):
    def __init__(self, bucket, **kwargs):
        self._configure(kwargs)
        self.bucket = bucket
        kwargs.setdefault('ensure_ascii', not self.encoding)
        self.encoder = ScrapyJSONEncoder(**kwargs)

    def start_exporting(self):
        self.client = connect()
        create_bucket(self.client, self.bucket)

    def finish_exporting(self):
        log.msg("Done from S3 item exporter", level=log.DEBUG)

    def export_item(self, item):
        log.msg("S3 item exporter got item: %s" % item, level=log.DEBUG)
        itemdict = dict(self._get_serialized_fields(item))
        data = self.encoder.encode(itemdict)
        size = len(data) 
        object_data = StringIO(data)
        name = str(uuid.uuid4())
        put_object(self.client, self.bucket, name, object_data, size)  


class ScrapyQuotesPipeline(object):
    """Export scraped items, to different buckets,
    one per spider"""
    @classmethod
    def from_crawler(cls, crawler):
        pipeline = cls()
        crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
        crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
        return pipeline

    def spider_opened(self, spider):
        self.exporter = S3ItemExporter(spider.name)
        self.exporter.start_exporting()

    def spider_closed(self, spider):
        self.exporter.finish_exporting()

    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

# S3 Related
from minio import Minio
import os
from minio.error import ResponseError
def connect():
    return Minio('192.168.1.111:9000',
            access_key='0M6PYKBBAVQVQGVWVZKQ',
            secret_key='H6vPxz0aHSMZPgagZ3G0lJ6CbhN8RlTtD78SPsL8',
            secure=False)

def create_bucket(client, name):
    client.make_bucket(name)

def put_object(client, bucket_name, object_name, object_data, size):
    client.put_object(bucket_name, object_name, object_data, size)
0 голосов
/ 19 июня 2017

Я столкнулся с той же проблемой, хотя, вероятно, будущие обновления версий впереди. Маленькая деталь, которая решила это для меня, была установка FEED_URI = "something" в settings.py. Без этого запись в FEED_EXPORTERS вообще не соблюдалась.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...