Как организовать проект на основе парсинга скриптов и вставки данных в postgres? - PullRequest
0 голосов
/ 10 апреля 2019

У меня есть несколько вопросов о моей логике проекта. Мой проект основан на скриптах, которые анализируют данные с тысяч сайтов (отдельный скрипт для сайта). На самом деле у меня есть ~ 40 сценариев, но я хочу организовать их для больших данных в будущем. После очистки скрипты имеют метод insert_data, который вставляет данные (список словарей) в мою базу данных POSTGRESQL. В скриптах для получения данных я использую библиотеки: запросы, urllib, иногда селен (страницы с JS) и BeautifulSoup4 для анализа загруженных данных. Ниже логика моих скриптов:

class Main():
# initialize variables

def __init__():

def get_url(self, url):
    requests.get(url)
    return soup(url)

# data_insert is list of dicts
def insert_data(self, data_insert):
    cursor.executemany("INSERT INTO table1 (url, title, etc...) VALUES (% (url)s, %(title)s, data_insert)

def __main__(self):
    # 1 looping
    for auction in auctions:
        list_auctions.append(get_href_auction)

    # 2 looping
    for row in list_auctions:
        get_url(row)
        grab some data
        record = {"var1":"value", "var2":"value" etc}
        final_list.append(record)

    # after 2 looping insert data to POSTGRESQL
    insert_data(final_list)

В базе данных у меня есть таблица для каждого веб-сайта и таблица 'data_all'. Скрипты, вставляющие данные в таблицу для каждого веб-сайта, и у меня есть триггер после вставки, который загружает данные из этих таблиц в основную таблицу «data_all». Ниже код моего триггера:

CREATE TRIGGER trigger_insert_data_to_main_table
  AFTER INSERT
  ON data_table1
  EXECUTE PROCEDURE insert_data_to_main_table();

CREATE TRIGGER trigger_insert_data_to_main_table
  AFTER INSERT
  ON data_table2
  EXECUTE PROCEDURE insert_data_to_main_table();

и т. Д. ... для всех моих столов.

CREATE OR REPLACE FUNCTION insert_data_to_main_table()
  RETURNS trigger AS
$BODY$
BEGIN
 INSERT INTO data_all
 SELECT t1.*
 FROM data_table1 t1
 LEFT JOIN data_all d ON d.url = t1.url
 WHERE d.url IS NULL
 UNION
 SELECT t2.*
 FROM data_table2 t2
 LEFT JOIN data_all d ON d.url = t2.url
 WHERE d.url IS NULL;
 RETURN NULL;
 END;
 SELECT t3.*
 FROM data_table3 t3 

и т. Д. ... для всех моих столов.

Эта функция позволяет мне игнорировать дубликаты URL-адресов (которые являются УНИКАЛЬНЫМИ на строку) в основной таблице «data_all».

  1. Хорошая логика для скриптования скриптов? Для страниц без JS работает нормально (быстро). Иногда у меня есть только 1 цикл для сбора данных с главной страницы (без итераций для аукциона).
  2. Есть ли хороший способ вставить данные таким образом? В будущем мне придется создать основной скрипт с очередью скриптов очистки
  3. Как обезопасить основной скрипт, чтобы в случае ошибки он возвращал мне сообщение и продолжал работать?
  4. Я недавно читал о многопроцессорности. Это хорошая идея использовать его в этом проекте для повышения эффективности?
  5. У кого-нибудь есть идея получить данные со страниц JS? Я нахожу решение на основе request.post. Это единственная альтернатива селену в питоне?

Большое спасибо за прочитанное сообщение до конца и надеюсь, что вы можете мне помочь. С уважением:)

1 Ответ

0 голосов
/ 13 апреля 2019

вы можете ускорить страницы селеном, стратегически прекратив загрузку страницы, как только вы получите нужные данные или появится определенный элемент. Насколько db советует, я использовал simpledb, но в прошлый раз я использовал его с странной проблемой с многопоточностью, поэтому я написал свою собственную локальную простую БД на основе словарей и json, я использую queue.queue () q.get () как взломать, чтобы заблокировать, а затем освободить разделы моего кода БД, чтобы я мог использовать в нескольких потоках

Я советую вам помещать надлежащие функции wait () между запросами к одному и тому же домену из уважения ко всем остальным агрегаторам данных, если вы злоупотребляете веб-сайтом, они усложнят для всех остальных. выяснить хорошую логику, основанную на IP, куки и URL / доменных именах, которые пропускают или ждут. я написал свой собственный IP-менеджер, чтобы справиться с этим, у меня даже есть минимальное время ожидания для URL, хотя я думаю, что большинство сайтов проверяют по домену, а не по URL в основном я проверяю IP, домен и конкретный URL в базе данных, чтобы увидеть, ждать ли количество секунд X на основе минимальных значений по умолчанию. все запросы http проходят через мой ipmanager перед отправкой запросов

Если вы используете многопоточность или многопроцессорность, вы хотите разделить функции, выполнение которых занимает больше всего времени, обычно это HTTP-запросы. Время ожидания ввода db минимально, я не разделяю его по разным потокам, я просто позволяю определенным операциям происходить по одному потоку за раз, как, например, функции update в db. читать функции не имеют блокировки

вот класс QStack, в основном я добавляю запросы в стек который возвращает идентификатор, я использую этот идентификатор в моем другом коде для обработки ответа Я могу предоставить более глубокое понимание кода, если вы хотите объединиться, я пытался выяснить, почему в последнее время у меня возникают проблемы с подключением к прокси-серверу, я попытался включить проверку в ложь, и я прочитал некоторые странные вещи в urllib3 в запросах

class QStack:

def getproxy(self,fp=0):
    #print 'wait for queue'
    #z=self.qq.get()
    while True:
        if fp:
            p=self.fpxo.get()
        else:
            p=self.pxo.get()
        ipo=ip(p)

        #print 'looking for ip'
        if ipo not in self.inuse:
            self.inuse.append(ipo)
            break
        time.sleep(1)

    #print 'put bob in queue'
    #self.qq.put('bob')
    return p


def start(self):
    self.ts1=time.time()
    self.qc=0
    self.hari2=1
    t = threading.Thread(
    target=self.manage_threads
            )
    t.start()




def stop(self):
    self.hari2=0

def __init__(self,ipm,pxo,
      mthreads=1,fpxo=None):

    self.fpxo=fpxo
    self.hari=1
    self.mthreads=mthreads
    self.ipm=ipm
    self.pxo=pxo

    self.rqs = Queue.Queue()
    self.rps = Queue.Queue()

    self.b = Queue.Queue()
    self.c = Queue.Queue()
    self.c.put('bob')
    self.b.put('bob')
    self.inuse=[]
    self.mc=-1
    self.athreads=0

    self.idc=-1
    self.ct=0

def manage_threads(self):
    while self.hari2:
        if self.athreads<self.mthreads:
            t = threading.Thread(
                target=self.do
            )
            t.start()
            #print 'added thread'*20
        #print 'current threads'*20,self.athreads
        time.sleep(1)

def do(self):
    if self.rqs.empty():
        return -1

    self.athreads+=1
    q=self.rqs.get()
    s = http_incognito(self.ipm)
    s.timeout=self.pxo.timeout
    hari = True

    while hari:
        #print 'hi'
        if q.ua == 'auto':
            s.incognito()
        else:
            s.useragent=q.ua

        s.cookies=q.cookies
        s.referer = q.ref

        fp=0
        self.mc+=1
        if self.mc >= self.mthreads:
            self.mc=0
            fp=1

        p = self.getproxy(fp)
        ipo=ip(p)

        q.p=p
        q.fp=fp
        #print'lalaalla'

        try:


            then=time.time()
            a=s.get(q.url,proxy=p)
            hari=0
            #print 'done'
            #ff=random.choice(xrange(1,10))
            #print 'sleep',ff
            #time.sleep(ff)


            #print'did reqeust'
        except Exception as e:
            print 'HTTP ERROR'
            print str(e)
            self.inuse.remove(ipo)

            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)
            self.athreads-=1
            continue


        q.rp=a
        #print 'okayyyy'

        #no blank response
        if not q.rp:
            print 'blank response'*20
            print q.url,q.p


            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)

            hari=1
            self.inuse.remove(ipo)
            continue

        if not q.rcheck(q.v()):
            print 'NONONONONONO'

            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)


            print 'robot', p
            self.inuse.remove(ipo)
            hari=1
            self.athreads-=1
            continue
        #print 'horehorehore'
        #print 'passed rcheck'

    #print 'lalala'
    self.rps.put(q)
    del s  #remove
    z= q.id


    self.inuse.remove(ipo)
    self.athreads-=1
    return z

def readd(self,q):
    self.rqs.put(
       q)

def add(self,
 url,
 ref=None,
 cookies=None,
 ua='auto',
 headers=None,
 rcheck=None):





    a=self.b.get()


    self.ct +=1

    q = QQuery(url,
        ref,cookies,
         ua,
         headers,
          rcheck=rcheck)

    self.idc += 1
    q.id=self.idc
    self.rqs.put(
       q)


    self.b.put('fred')
    return q.id


def get(self,ide=None):
    a=self.c.get()

    q = self.rps.get()
    if ide != None:
        while q.id != ide:
            self.rps.put(q)
            q=self.rps.get()

    self.c.put('bob')
    return q

используется следующим образом:

qinfo={}
pxo=proxymanager()
ipm=ipmanager()
fpxo=myip()

qs=QStack(10,pxo,ipm,fpxo)
for u in urllist:
    ide = qs.add(u)
    qinfo[ide]='type, more info'
qs.start()


while True:
    q=qs.get() 
    info=qinfo[q.id]
    #process query in different ways
    #based on info, id, etc
    find more urls
     qs.add(u)
...