Двойные вставки в базу данных с использованием sqlite, sqlalchemy, python - PullRequest
0 голосов
/ 01 октября 2009

Я изучаю Python и, благодаря онлайн-ресурсам и людям на этом сайте, осваиваю его. В этом моем первом скрипте, в котором я анализирую записи RSS-канала Twitter и вставляю результаты в базу данных, есть одна проблема, которую я не могу исправить. А именно, в одну из таблиц вставляются повторяющиеся записи.

В качестве предыстории я первоначально нашел на HalOtis.com базовый скрипт для загрузки RSS-каналов, а затем изменил его несколькими способами: 1) изменил учет учетных особенностей в RSS-каналах Twitter (он не разделен на контент, заголовок) , URL и т. Д.); 2) добавлены таблицы для «хэштегов» и для отношения «многие ко многим» (таблица entry_tag); 3) изменил настройку таблицы на sqlalchemy; 4) внесли некоторые специальные изменения, чтобы учесть возникающие странные проблемы с юникодом. В результате код местами уродлив, но он был хорошим опытом обучения и теперь работает - за исключением того, что он продолжает вставлять дубликаты в таблицу «записей».

Поскольку я не уверен, что было бы наиболее полезным для людей, я вставил весь код ниже с некоторыми комментариями в нескольких местах, чтобы указать, что я считаю наиболее важным.

Я был бы очень признателен за любую помощь в этом. Спасибо!

Редактировать: Кто-то предложил мне предоставить схему для базы данных. Я никогда не делал этого раньше, поэтому, если я не делаю это правильно, терпите меня. Я настраиваю четыре стола:

  1. RSSFeeds, который содержит список RSS-каналов Twitter
  2. RSSEntries, который содержит список отдельных записей, загруженных (после синтаксического анализа) из каждого канала (со столбцами для содержимого, хэштегами, датой, URL)
  3. Теги, которые содержат список всех хэштегов, найденных в отдельных записях (твиты)
  4. entry_tag, который содержит столбцы, позволяющие сопоставлять теги с записями.

Короче говоря, приведенный ниже скрипт захватывает пять тестовых RSS-каналов из таблицы RSS-каналов, загружает 20 последних записей / твитов из каждого канала, анализирует записи и помещает информацию в таблицы RSS Entries, Tags и entry_tag. ,

#!/usr/local/bin/python

import sqlite3
import threading
import time
import Queue
from time import strftime
import re       
from string import split 
import feedparser 
from django.utils.encoding import smart_str, smart_unicode      
from sqlalchemy import schema, types, ForeignKey, select, orm
from sqlalchemy import create_engine

engine = create_engine('sqlite:///test98.sqlite', echo=True)
metadata = schema.MetaData(engine)   
metadata.bind = engine

def now():
    return datetime.datetime.now()


#set up four tables, with many-to-many relationship
RSSFeeds = schema.Table('feeds', metadata,
    schema.Column('id', types.Integer, 
        schema.Sequence('feeds_seq_id', optional=True), primary_key=True),
    schema.Column('url', types.VARCHAR(1000), default=u''),
)


RSSEntries = schema.Table('entries', metadata,
    schema.Column('id', types.Integer, 
        schema.Sequence('entries_seq_id', optional=True), primary_key=True),
    schema.Column('feed_id', types.Integer, schema.ForeignKey('feeds.id')),
    schema.Column('short_url', types.VARCHAR(1000), default=u''),
    schema.Column('content', types.Text(), nullable=False),
    schema.Column('hashtags', types.Unicode(255)),
    schema.Column('date', types.String()),  
)


tag_table = schema.Table('tag', metadata,
    schema.Column('id', types.Integer,
       schema.Sequence('tag_seq_id', optional=True), primary_key=True),
    schema.Column('tagname', types.Unicode(20), nullable=False, unique=True),
)


entrytag_table = schema.Table('entrytag', metadata,
    schema.Column('id', types.Integer,
        schema.Sequence('entrytag_seq_id', optional=True), primary_key=True),
    schema.Column('entryid', types.Integer, schema.ForeignKey('entries.id')),
    schema.Column('tagid', types.Integer, schema.ForeignKey('tag.id')),
)


metadata.create_all(bind=engine, checkfirst=True)


# Insert test set of Twitter RSS feeds
stmt = RSSFeeds.insert()
stmt.execute(
    {'url': 'http://twitter.com/statuses/user_timeline/14908909.rss'},
    {'url': 'http://twitter.com/statuses/user_timeline/52903246.rss'},
    {'url': 'http://twitter.com/statuses/user_timeline/41902319.rss'},
    {'url': 'http://twitter.com/statuses/user_timeline/29950404.rss'},
    {'url': 'http://twitter.com/statuses/user_timeline/35699859.rss'},
)



#These 3 lines for threading process (see HalOtis.com for example) 
THREAD_LIMIT = 20
jobs = Queue.Queue(0)
rss_to_process = Queue.Queue(THREAD_LIMIT)


#connect to sqlite database and grab the 5 test RSS feeds
conn = engine.connect()
feeds = conn.execute('SELECT id, url FROM feeds').fetchall()

#This block contains all the parsing and DB insertion 
def store_feed_items(id, items):
    """ Takes a feed_id and a list of items and stores them in the DB """
    for entry in items:
        conn.execute('SELECT id from entries WHERE short_url=?', (entry.link,))
        #note: entry.summary contains entire feed entry for Twitter, 
                    #i.e., not separated into content, etc.
        s = unicode(entry.summary) 
        test = s.split()
        tinyurl2 = [i for i in test if i.startswith('http://')]
        hashtags2 = [i for i in s.split() if i.startswith('#')]
        content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2)
        content = unicode(content2)
        tinyurl = unicode(tinyurl2)
        hashtags = unicode (hashtags2)
        print hashtags
        date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed)


        #Insert parsed feed data into entries table 
                    #THIS IS WHERE DUPLICATES OCCUR
        result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl,
            'content': content, 'hashtags': hashtags, 'date': date})
        entry_id = result.last_inserted_ids()[0]


        #Look up tag identifiers and create any that don't exist:
        tags = tag_table
        tag_id_query = select([tags.c.tagname, tags.c.id], tags.c.tagname.in_(hashtags2))
        tag_ids = dict(conn.execute(tag_id_query).fetchall())
        for tag in hashtags2:
            if tag not in tag_ids:
                result = conn.execute(tags.insert(), {'tagname': tag})
                tag_ids[tag] = result.last_inserted_ids()[0]

        #insert data into entrytag table 
        if hashtags2: conn.execute(entrytag_table.insert(),
            [{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2])


#Rest of file completes the threading process     
def thread():
    while True:
        try:
            id, feed_url = jobs.get(False) # False = Don't wait
        except Queue.Empty:
            return

        entries = feedparser.parse(feed_url).entries
        rss_to_process.put((id, entries), True) # This will block if full

for info in feeds: # Queue them up
    jobs.put([info['id'], info['url']])

for n in xrange(THREAD_LIMIT):
    t = threading.Thread(target=thread)
    t.start()

while threading.activeCount() > 1 or not rss_to_process.empty():
    # That condition means we want to do this loop if there are threads
    # running OR there's stuff to process
    try:
        id, entries = rss_to_process.get(False, 1) # Wait for up to a second
    except Queue.Empty:
        continue

    store_feed_items(id, entries)

1 Ответ

2 голосов
/ 02 октября 2009

Похоже, вы включили SQLAlchemy в ранее существующий скрипт, который не использовал SQLAlchemy. Здесь слишком много движущихся частей, которые никто из нас не понимает достаточно хорошо.

Я бы рекомендовал начать с нуля. Не используйте потоки. Не используйте sqlalchemy. Для начала, возможно, даже не используйте базу данных SQL. Напишите скрипт, который собирает информацию, которую вы хотите, простым способом в простую структуру данных, используя простые циклы и, возможно, time.sleep (). Затем, когда это сработает, вы можете добавить хранилище в базу данных SQL, и я действительно не думаю, что непосредственно писать операторы SQL намного сложнее, чем использовать ORM, и легче отлаживать IMHO. Существует большая вероятность, что вам никогда не понадобится добавлять потоки.

«Если вы думаете, что достаточно умен, чтобы писать многопоточные программы, это не так». - Джеймс Алстрем.

...