Twisted приложение теряет событие подключения MySQL с возможностью повторного подключения - PullRequest
0 голосов
/ 26 сентября 2018

Я использовал скрученный в респираторе IoT TCP-сервер, и теперь я перенес код для обслуживания проекта с подключенным замком.Эти два проекта выполняются на одном облачном сервере, подключается к одному и тому же серверу MySQL с двумя изолированными учетными записями MySQL.Однако последний иногда выдает ошибку для пропавшего соединения MySQL.

Вот (упрощенный) код.

#!/usr/bin/env python
# coding: utf-8

from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.internet.protocol import Factory, ClientCreator
from twisted.enterprise import adbapi
from twisted.python import log
from twisted.protocols.policies import TimeoutMixin

import txredisapi as redis

sys.path.append("..")   # Add package path, will be installed into python path later
from connector.aescrypt import aescrypt
from connector.epic_config import EpicConf

conf = EpicConf().loadConf()
dbid = string.atoi(conf['Redisdbid'])
rcs = redis.lazyConnection(password=conf['RedisPassword'], dbid=dbid, reconnect=True)
dbpool = adbapi.ConnectionPool("MySQLdb",db=conf['DbName'],user=conf['DbAccount'],\
        passwd=conf['DbPassword'],host=conf['DbHost'],\
        use_unicode=True,charset=conf['DbCharset'],cp_reconnect=True)
# cp_reconnect = True is enabled.

class EpicState(object):
    pass

class AesCryptor(object):
    pass

class PlainTCP(protocol.Protocol, TimeoutMixin):
    def __init__(self):
        pass

    def dataReceived(self, data):
        self.attackFilter(data)
        self.dataDecoded(data)

    def dataDecoded(self, data):
        if "HELO" in data:
            self.onHello(data)
        else:
            print("Invalid Command")

@defer.inlineCallbacks
def onHello(self, data):
    global rcs
    self.epicstate = EpicState.HELO
    try:
        _, self.protocolVersion, self.devId = data.split(',')
    except ValueError, e:
        error = "onHello_err: {}".format(e)
        print error
        self.write(error)
        self.transport.loseConnection()
        defer.returnValue(False)

    if self.protocolVersion<1:
        # replace "NACK,-1"
        self.transport.write(b"HELO,{},{},##############\r\n".format(0xFF,0xFF))

    ctx = "%s@%s"%(self.devId, self.peer.host)
    print "[dbg] isDevConnectedAlready(%s)"%(ctx)
    isConnected = yield rcs.sismember(self.factory.onlineDevices, ctx)
    print "isConnected={}".format(isConnected)

    if isConnected:
        e = "Duplicate connections"
        error = "onHello_err: {}".format(e)
        print error
        # replace "NACK,-2"
        self.transport.write(b"HELO,{},{}\r\n".format(0xFE,0xFE))
        # Reopen for a moment
        self.devId = None
        self.transport.loseConnection()
        defer.returnValue(False)
    else:
        self.appendDevToDbCache(rcs)

    query = "SELECT `uid`,`deviceWriteKey`,`serverAccessKey`,`aesKey`,`aesIV`,`productId` FROM `device` WHERE `id`={} LIMIT 1".format(self.devId)
    row = yield dbpool.runQuery(query)
    if row:
        self.uid = row[0][0]
        self.writekey = row[0][1]
        self.serverkey = row[0][2]
        self.aes_key = row[0][3]
        self.aes_iv = row[0][4]
        self.productId = row[0][5]
        self.start = int(time.time())
        self.nonce = uuid.uuid4().hex.upper()[:16]
        self.transport.write(b"HELO,{},{}\r\n".format(self.start, self.nonce))
        self.epicstate = EpicState.IDLE
    else:
        print "Device {} No Match from DB".format(self.devId)
        self.transport.write(b"NACK\r\n") 
        self.transport.loseConnection()

    query = "SELECT `productId`, `version`,`url` FROM `firmware` WHERE `productId`={} ORDER BY `id` DESC LIMIT 1".format(self.productId)
    row = yield dbpool.runQuery(query)
    if row:
        self.otafile = row[0][2]

Это связано с тем, что если я использую симулятор Python для тестированиямой сервер, соединение MySQL всегда работает хорошо, никогда не теряя соединение.Но если я использую физическое устройство, подключающееся к моему серверу, иногда я вижу такую ​​ошибку.

2018-09-25 14:21:54+0800 Peer: 73.202.44.233:1769 Conn+: 1:3
2018-09-25 14:21:54+0800 ASC[9]:HELO,1,16
2018-09-25 14:21:54+0800 [dbg] isDevConnectedAlready(16@73.202.44.233)
2018-09-25 14:21:54+0800 isConnected=False
2018-09-25 14:21:54+0800 [dbg] appendDevToDbCache(16@73.202.44.233)
2018-09-25 14:21:54+0800 Rollback failed
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/threadpool.py", line 196, in _worker
        result = context.call(ctx, function, *args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/context.py", line 118, in callWithContext
        return self.currentContext().callWithContext(ctx, func, *args, **kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/context.py", line 81, in callWithContext
        return func(*args,**kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/enterprise/adbapi.py", line 455, in _runInteraction
        conn.rollback()
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/enterprise/adbapi.py", line 56, in rollback
        self._connection.rollback()
    _mysql_exceptions.OperationalError: (2006, 'MySQL server has gone away')

2018-09-25 14:21:54+0800 Rollback failed
    Traceback (most recent call last):
      File "/usr/lib/python2.7/threading.py", line 504, in run
        self.__target(*self.__args, **self.__kwargs)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/threadpool.py", line 196, in _worker
        result = context.call(ctx, function, *args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/context.py", line 118, in callWithContext
        return self.currentContext().callWithContext(ctx, func, *args, **kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/context.py", line 81, in callWithContext
        return func(*args,**kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/enterprise/adbapi.py", line 455, in _runInteraction
        conn.rollback()
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/enterprise/adbapi.py", line 70, in rollback
        raise ConnectionLost()
    twisted.enterprise.adbapi.ConnectionLost: 

2018-09-25 14:21:54+0800 Unhandled error in Deferred:
2018-09-25 14:21:54+0800 Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 424, in errback
        self._startRunCallbacks(fail)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 491, in _startRunCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 578, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 1163, in gotResult
        _inlineCallbacks(r, g, deferred)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 1105, in _inlineCallbacks
        result = result.throwExceptionIntoGenerator(g)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/failure.py", line 389, in throwExceptionIntoGenerator
        return g.throw(self.type, self.value, self.tb)
      File "/home/allankliu/Epic_Connector/GLINK_plus/secure_server/EpicGlinkTcpServer.py", line 414, in onHello
        row = yield dbpool.runQuery(query)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/threadpool.py", line 196, in _worker
        result = context.call(ctx, function, *args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/context.py", line 118, in callWithContext
        return self.currentContext().callWithContext(ctx, func, *args, **kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/python/context.py", line 81, in callWithContext
        return func(*args,**kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/enterprise/adbapi.py", line 448, in _runInteraction
        result = interaction(trans, *args, **kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-15.0.0-py2.7-linux-i686.egg/twisted/enterprise/adbapi.py", line 462, in _runQuery
        trans.execute(*args, **kw)
      File "/usr/lib/python2.7/dist-packages/MySQLdb/cursors.py", line 174, in execute
        self.errorhandler(self, exc, value)
      File "/usr/lib/python2.7/dist-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
        raise errorclass, errorvalue
    _mysql_exceptions.OperationalError: (2006, 'MySQL server has gone away')

2018-09-25 14:21:54+0800 [DBG] devId=None, peer=None, cancel checkCommand

Из приведенного выше файла журнала система выдает исключения для

  • _mysql_exceptions.OperationalError: (2006, «сервер MySQL ушел»)
  • twisted.enterprise.adbapi.ConnectionLost
  • исключение, связанное с откатом для тех же ошибок MySQL 2006.

все они вызваны

row = yield dbpool.runQuery(query)

, который используется для загрузки всех метаданных (ключ AES / IV / Access / другие ключи), относящихся к существующему устройству, из RDBS, большинство из которых используются для последующей взаимной аутентификации.

Чтобы зафиксировать исключения, вызванные одним оператором.Я изменил свой код следующим образом:

@defer.inlineCallbacks
def onHello(self, data):
    global rcs
    self.epicstate = EpicState.HELO
    try:
        _, self.protocolVersion, self.devId = data.split(',')
    except ValueError, e:
        error = "onHello_err: {}".format(e)
        print error
        self.write(error)
        self.transport.loseConnection()
        defer.returnValue(False)

    if self.protocolVersion<1:
        # replace "NACK,-1"
        self.transport.write(b"HELO,{},{},##############\r\n".format(0xFF,0xFF))

    ctx = "%s@%s"%(self.devId, self.peer.host)
    print "[dbg] isDevConnectedAlready(%s)"%(ctx)
    isConnected = yield rcs.sismember(self.factory.onlineDevices, ctx)
    print "isConnected={}".format(isConnected)

    if isConnected:
        e = "Duplicate connections"
        error = "onHello_err: {}".format(e)
        print error
        # replace "NACK,-2"
        self.transport.write(b"HELO,{},{}\r\n".format(0xFE,0xFE))
        # Reopen for a moment
        self.devId = None
        self.transport.loseConnection()
        defer.returnValue(False)
    else:
        self.appendDevToDbCache(rcs)

    query = "SELECT `uid`,`deviceWriteKey`,`serverAccessKey`,`aesKey`,`aesIV`,`productId` FROM `device` WHERE `id`={} LIMIT 1".format(self.devId)
    try:
        row = yield dbpool.runQuery(query)
    except:
        self.transport.write(b"MySQL error\r\n")
        self.transport.loseConnection()
        defer.returnValue(False)

    if row:
        self.uid = row[0][0]
        self.writekey = row[0][1]
        self.serverkey = row[0][2]
        self.aes_key = row[0][3]
        self.aes_iv = row[0][4]
        self.productId = row[0][5]
        self.start = int(time.time())
        self.nonce = uuid.uuid4().hex.upper()[:16]
        self.transport.write(b"HELO,{},{}\r\n".format(self.start, self.nonce))
        self.epicstate = EpicState.IDLE
    else:
        print "Device {} No Match from DB".format(self.devId)
        self.transport.write(b"NACK\r\n")
        self.transport.loseConnection()
        defer.returnValue(False)

    query = "SELECT `productId`, `version`,`url` FROM `firmware` WHERE `productId`={} ORDER BY `id` DESC LIMIT 1".format(self.productId)
    try:
        row = yield dbpool.runQuery(query)
    except:
        self.transport.write(b"MySQL error\r\n")
        self.transport.loseConnection()
        defer.returnValue(False)

    if row:
        self.otafile = row[0][2]

Но я все еще не уверен в параметрах переподключения для компонента adbapi.

Почему он не работает так же, как и в предыдущей версиина том же облачном сервере, тот же код подключения MySQL? Я дважды проверил у своего поставщика облачных услуг, у соединения нет никаких ограничений вообще.Любая подсказка, чтобы отладить такую ​​проблему?

Достаточно ли моей попытки, кроме кода, достаточно для захвата таких комбинированных исключений?

...