Добавление запроса из Twisted `enterprise.adbapi` в реакторный цикл, созданный демоном` twistd` - PullRequest
4 голосов
/ 25 октября 2011

Я использую twisted.enterprise.adbapi внутри плагина Twisted .tac и обнаруживаю, что отложенный объект, возвращаемый для таких функций, как aConnectionPool.runQuery(sqlQuery), не запускается, если не вызывается reactor.(run). Как я могу добавить запрос в цикл реактора, созданный twistd вместо вызова reactor.run()? Это общая процедура или что-то специфическое для API асинхронной базы данных?

изменить - прикрепил код:

from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer

from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

import json
import base64
from twisted.enterprise import adbapi

class StringProducer(object):
    implements(IBodyProducer)

    def __init__(self, body):
        self.body = body
        self.length = len(body)

    def startProducing(self, consumer):
        consumer.write(self.body)
        return succeed(None)

    def pauseProducing(self):
        pass

    def stopProducing(self):
        pass

def httpRequest(url, values, headers={}, method='POST'):

    agent = Agent(reactor)
    d = agent.request(method,
                      url,
                      Headers(headers),
                      StringProducer(values)
                      )

    def handle_response(response):
        if response.code == 204:
            d = defer.succeed('')
        else:
            class SimpleReceiver(protocol.Protocol):
                def __init__(s, d):
                    s.buf = ''; s.d = d
                def dataReceived(s, data):
                    s.buf += data
                    response = json.loads(data)

                    receipt = response[u'receipt']
                    if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
                        transactionID = receipt[u'original_transaction_id']
                        date = receipt[u'original_purchase_date']
                        purchaseDate = date.strip(' Etc/GMT')
                        print transactionID
                        print purchaseDate

                        dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user',  passwd='passwd')
                        dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))

                        def finishInsert(dObject, pool):
                            print 'inserted!'
                            pool.close()
                        dOperation.addCallback(finishInsert, dbpool)

                        def insertError(dObject):
                            print 'insert error!'
                        dOperation.addErrback(insertError)

                def connectionLost(s, reason):
                    s.d.callback(s.buf)

            d = defer.Deferred()
            response.deliverBody(SimpleReceiver(d))
        return d

    d.addCallback(handle_response)

class StoreServer(protocol.Protocol):

    def dataReceived(self, data):
        a = data.split(':delimiter:')

        if a[0] == 'addToUserList':
            receiptBase64 = base64.standard_b64encode(a[1])
            jsonReceipt = json.dumps({'receipt-data':receiptBase64})

            httpRequest(
                        "https://buy.itunes.apple.com/verifyReceipt",
                        jsonReceipt,
                        {'Content-Type': ['application/x-www-form-urlencoded']}
                        )

application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)

1 Ответ

2 голосов
/ 11 декабря 2011

Ваш код создает новый ConnectionPool для каждого запроса. Новый ConnectionPool создает собственный новый пул потоков для выполнения запросов и должен установить новое соединение с базой данных.

Это означает, что у вас фактически нет пула соединений. У вас просто есть много соединений, которые вы создаете и используете один раз. Кроме того, ошибка, insertError, не закрывает пул.

В совокупности эти вещи означают, что не существует ограничения на количество потоков / соединений, которые могут быть созданы одновременно, за исключением ограничения, налагаемого вашей системой на то, сколько памяти вы можете выделить, или сколько сокетов вы можете открыть. Когда вы столкнетесь с одним из этих ограничений, все будет не очень красиво.

Это также означает, что каждая ошибка запроса приводит к утечке нескольких потоков и соединений (ConnectionPool устанавливает 3 потока / соединения при запуске). После достаточного количества ошибок вы не сможете создавать больше потоков или соединений, поэтому вы больше не сможете запрашивать вашу базу данных. Ваш запрос прост, и вы можете подумать, что ошибки не очень вероятны, но MySQL имеет тенденцию отключать клиентов несколько случайным образом (и, возможно, вы хотя бы в какой-то степени это осознавали, поскольку вы добавили добавление ошибки сообщить об ошибке).

Предполагаемое использование ConnectionPool состоит в том, чтобы создать одно (или два, или какое-то другое небольшое фиксированное число), а затем повторно использовать его для всех ваших запросов. Я не знаю, связаны ли эти проблемы с теми, которые вы изначально наблюдали, или нет, но, вероятно, это проблемы, которые вы должны решить.

...