Обработка потоковых данных большого объема с использованием Twisted или с использованием потоков, очередь в Python - PullRequest
2 голосов
/ 05 июля 2010

Я очень быстро получаю твиты от долгоживущего соединения с потоковым сервером Twitter API. Я продолжаю выполнять тяжелую обработку текста и сохраняю твиты в своей базе данных.

Я использую PyCurl для функции соединения и обратного вызова, которая заботится об обработке и сохранении текста в БД. Ниже смотрите мой подход, который не работает должным образом.

Я не знаком с сетевым программированием, поэтому хотел бы знать: Как использовать Threads, Queue или Twisted Frameworks для решения этой проблемы?

def process_tweet():
    # do some heaving text processing


def open_stream_connection():
    connect = pycurl.Curl()
    connect.setopt(pycurl.URL, STREAMURL)
    connect.setopt(pycurl.WRITEFUNCTION, process_tweet)
    connect.setopt(pycurl.USERPWD, "%s:%s" % (TWITTER_USER, TWITTER_PASS))
    connect.perform()

Ответы [ 3 ]

1 голос
/ 18 июля 2010

У вас должно быть несколько потоков, получающих сообщения по мере их поступления. Это число, вероятно, должно быть 1, если вы используете pycurl, но должно быть больше, если вы используете httplib - идея в том, что вы хотите иметь возможность иметь более одного запроса в API Twitter одновременно, поэтому требуется постоянная обработка.

Когда приходит каждый твит, он помещается в очередь. Очередь гарантирует, что в сообщениях есть безопасность потоков - каждый твит будет обрабатываться только одним рабочим потоком.

Пул рабочих потоков отвечает за чтение из очереди и работу с твитами. Только интересные твиты должны быть добавлены в базу данных.

Поскольку база данных, вероятно, является узким местом, существует ограничение на количество потоков в пуле, которые стоит добавить - больше потоков не заставит его работать быстрее, это просто будет означать, что в очереди ожидает больше потоков для доступа к базе данных.

Это довольно распространенная идиома Python. Эта архитектура будет масштабироваться только до определенной степени - то есть, что может обрабатывать одна машина.

0 голосов
/ 18 июля 2010

Вот простая настройка, если вы в порядке с использованием одной машины.

1 поток принимает соединения.После того, как соединение принято, оно передает принятое соединение другому потоку для обработки.

Вы, конечно, можете использовать процессы (например, multiprocessing) вместо потоков, но я не знаком сmultiprocessing дать совет.Настройка будет такой же: 1 процесс принимает соединения, а затем передает их подпроцессам.

Если вам нужно разделить обработку на нескольких машинах, тогда проще всего было бы вставить сообщение в базу данных.затем уведомите работников о новой записи (это потребует некоторой координации / блокировки между работниками).Если вы хотите избежать попадания в базу данных, вам придется передавать сообщения от вашего сетевого процесса работникам (и я не достаточно разбираюсь в низкоуровневых сетях, чтобы рассказать вам, как это сделать:))

0 голосов
/ 14 июля 2010

Я предлагаю эту организацию:

  • один процесс читает твиттер, добавляет твиты в базу данных
  • один или несколько процессов считывают базу данных, обрабатывают каждый, вставляют в новую базу данных. Оригинальные твиты либо удалены, либо помечены как обработанные.

То есть у вас есть еще два процесса / потока. База данных твитов может рассматриваться как очередь работы. Несколько рабочих процессов извлекают задания (твиты) из очереди и создают данные во второй базе данных.

...