Scrapy + PostgreSQL - Automati c Элементы и конвейер для настраиваемого ETL (Обрезать> Вставить> Upsert> Удалить) - PullRequest
0 голосов
/ 06 января 2020

У меня уже есть несколько работающих Spiders и кода для достижения того, чего я хочу, но я искал совет, как более эффективно консолидировать вещи для проекта, над которым я работаю.

Мой текущий процесс включает в себя:

  • В Scrapy: создание элементов вручную с использованием scrapy.Item
  • В Scrapy: сканирование, вывод каждой строки элементов в файл JSON Lines (JL)

    • Текущий конвейер:
    #pipelines.py
    class MyPipeline(object):
        def process_item(self, item, spider):
            for field in item.fields:
                item.setdefault(field, None)
            return item
    
  • Снаружи Scrapy w / SQL Алхимия: truncate incoming table, bulk insert JL файл с использованием Pandas to_ sql

  • Наружная терапия с SQL Алхимия: update incoming table row_uuid column (md5 га sh всех соответствующих столбцов)
  • Наружная терапия с / SQL Алхимия: upsert (insert...on conflict...where row_uuid is distinct from) входящей таблицы данных в исходную таблицу данных
  • Снаружи Scrapy w / SQL Алхимия: delete из исходной таблицы при необходимости (404 ошибки, et c)

В идеале, я хочу выполнить все эти действия в Scrapy, используя подходящий конвейер. Я видел набор данных , упомянутый. Поможет ли какая-нибудь комбинация open_spider, process_item и close_spider? У меня есть несколько вопросов:

  • Можно ли заполнять / определять элементы Scrapy непосредственно из существующей таблицы базы данных, не перечисляя столбцы вручную?
  • Если у вас есть несколько методов в Spider ( parse, parse_detail, et c), каждый из которых имеет свой собственный элемент, сможет ли конвейер вставлять в соответствующую таблицу базы данных?
  • Можно ли выполнять массовую вставку элементов X одновременно, а не по одному Пункт за один раз?
  • Будет ли приведенный ниже потенциальный подход? Я предполагаю, что другие изменения были бы необходимы ...

    #pipelines.py
    class SqlPipeline(object):
        def __init__(self, db_conn):
            #Connect to DB here?
    
        def open_spider(self, spider):
            #Truncate specific incoming table here?
    
        def process_item(self, item, spider):
            #(Bulk) insert item row(s) into specific incoming table?
            #Where would you define the table for this?
    
        def close_spider(self, spider):
            #Update row_uuid for specific incoming table?
            #Do upsert and delete rows for specific source table?
            #Close DB connection here?
    

    Спасибо за вашу помощь!

1 Ответ

1 голос
/ 12 января 2020

Трубопроводы в Scrapy используются именно для того, что вы говорите. Отвечая на ваши вопросы:

  • Возможно ли заполнить / определить элементы Scrapy непосредственно из существующей таблицы базы данных, не перечисляя столбцы вручную?

I не понимаю "прослушивание колонок вручную". Я собираюсь догадаться, что у вас есть таблица в базе данных с кучей столбцов. Эти столбцы должны быть определены в ваших элементах, потому что они будут сопоставлены с БД. Если нет, как вы ожидаете отобразить каждое поле в столбце таблицы?

  • Если у вас есть несколько методов в Spider (parse, parse_detail и т. Д. c), каждый из которых имеет свой собственный Item, сможет ли Pipeline вставить его в соответствующую таблицу базы данных?

Да. Вы можете определить несколько конвейеров (с их весом), чтобы разделить разные процессы на элемент и правильно их разделить.

  • Возможно ли одновременное массовое добавление элементов X, а не одного элемента вовремя? Будет ли приведенный ниже потенциальный подход?

Да. Конечно! Вы должны определить это в своем конвейере. Ваш лог c может отличаться в каждом!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...