Получение быстрого перевода строковых данных, передаваемых через сокет, в объекты в Python - PullRequest
2 голосов
/ 25 февраля 2010

В настоящее время у меня есть приложение Python, в котором строки ASCII с завершающей новой строкой передаются мне через сокет TCP / IP. У меня высокая скорость передачи данных этих строк, и мне нужно проанализировать их как можно быстрее. В настоящее время строки передаются в формате CSV, и если скорость передачи данных достаточно высока, мое приложение на Python начинает отставать от скорости ввода данных (вероятно, это не удивительно).

Строки выглядят примерно так:

chan,2007-07-13T23:24:40.143,0,0188878425-079,0,0,True,S-4001,UNSIGNED_INT,name1,module1,...

У меня есть соответствующий объект, который будет анализировать эти строки и сохранять все данные в объекте. В настоящее время объект выглядит примерно так:

class ChanVal(object):
    def __init__(self, csvString=None,**kwargs):

        if csvString is not None:
            self.parseFromCsv(csvString)

        for key in kwargs:
                setattr(self,key,kwargs[key])

    def parseFromCsv(self, csvString):

        lst = csvString.split(',')

        self.eventTime=lst[1]
        self.eventTimeExact=long(lst[2])
        self.other_clock=lst[3]
        ...

Чтобы прочитать данные из сокета, я использую базовый «socket.socket (socket.AF_INET, socket.SOCK_STREAM)» (мое приложение - это сокет сервера), а затем я использую «select». poll () "объект из модуля" select "для постоянного опроса сокета на предмет нового ввода, используя метод" poll (...) ".

У меня есть некоторый контроль над процессом отправки данных (имеется в виду, что я могу заставить отправителя изменить формат), но было бы очень удобно, если бы мы могли достаточно ускорить обработку ASCII, чтобы не использовать фиксированную ширину или двоичные форматы для данных.

Итак, до сих пор вот вещи, которые я пробовал и которые на самом деле не имели большого значения:

  1. Использование строкового метода «split» и последующее непосредственное индексирование списка результатов (см. Выше), но «split» кажется очень медленным.
  2. Использование объекта "reader" в модуле "csv" для разбора строк
  3. Изменение строк, отправляемых в формат строки, который я могу использовать для непосредственного создания экземпляра объекта через «eval» (например, отправка чего-то вроде «ChanVal (eventTime = '2007-07-13T23: 24: 40.143', eventTimeExact = 0 , ...) ")

Я стараюсь избегать перехода к фиксированному или двоичному формату, хотя я понимаю, что в конечном итоге это будет гораздо быстрее.

В конечном счете, я открыт для предложений о лучших способах опроса сокета, лучших способах форматирования / синтаксического анализа данных (хотя мы надеемся, что мы можем придерживаться ASCII) или обо всем, что вы можете придумать.

Спасибо!

Ответы [ 2 ]

3 голосов
/ 25 февраля 2010

Вы не можете сделать Python быстрее. Но вы можете сделать ваше приложение Python быстрее.

Принцип 1: Делай меньше.

Вы не можете сделать меньше разбора ввода по всем , но вы можете сделать меньше разбора ввода в процессе, который также читает сокет и делает все остальное с данными.

Как правило, сделайте это.

Разбейте ваше приложение на набор отдельных шагов.

  1. Чтение сокета, разбиение на поля, создание именованного кортежа, запись кортежа в канал с чем-то вроде pickle.

  2. Считать канал (с pickle) для создания именованного кортежа, выполнить некоторую обработку, записать в другой канал.

  3. Чтение канала, некоторая обработка, запись в файл или что-то.

Каждый из этих трех процессов, связанных с каналами ОС, выполняется одновременно. Это означает, что первый процесс читает сокет и создает кортежи, в то время как второй процесс использует кортежи и выполняет вычисления, в то время как третий процесс выполняет вычисления и пишет файл.

Этот тип конвейера максимально увеличивает возможности вашего процессора. Без особых мучительных трюков.

Чтение и запись в каналы является тривиальным, поскольку linux заверяет вас, что sys.stdin и sys.stdout будут каналами, когда оболочка создает конвейер.

Прежде чем делать что-либо еще, разбейте вашу программу на этапы конвейера.

proc1.py

import cPickle
from collections import namedtuple

ChanVal= namedtuple( 'ChanVal', ['eventTime','eventTimeExact', 'other_clock', ... ] )
for line socket:
    c= ChanVal( **line.split(',') )
    cPickle.dump( sys.stdout )

proc2.py

import cPickle
from collections import namedtuple
ChanVal= namedtuple( 'ChanVal', ['eventTime','eventTimeExact', 'other_clock', ... ] )
while True:
    item = cPickle.load( sys.stdin )
    # processing
    cPickle.dump( sys.stdout )

Эта идея обработки именованных кортежей через конвейер очень масштабируема.

python proc1.py | python proc2.py
0 голосов
/ 25 февраля 2010

Вам нужно профилировать свой код, чтобы узнать, где время тратится.

Это не обязательно означает использование профилировщика Python

Например, вы можете просто попробовать проанализировать тот же CSVСтрока 1000000 раз разными методами.Выберите самый быстрый метод - разделите на 1000000, теперь вы знаете, сколько процессорного времени требуется для разбора строки

Попробуйте разбить программу на части и выяснить, какие ресурсы действительно требуются для каждой части.

Части, которые требуют наибольшего количества ресурсов процессора на строку ввода, - это «горлышки бутылки»

На моем компьютере, программа ниже выдает это

ChanVal0 took 0.210402965546 seconds
ChanVal1 took 0.350302934647 seconds
ChanVal2 took 0.558166980743 seconds
ChanVal3 took 0.691503047943 seconds

Итак, вы видите, что примерно половина временизанято parseFromCsv.Но также и то, что на извлечение значений и их сохранение в классе уходит довольно много времени.

Если класс не используется сразу, может быть быстрее сохранить необработанные данные и использовать свойства для анализаcsvString по запросу.

from time import time
import re

class ChanVal0(object):
    def __init__(self, csvString=None,**kwargs):
        self.csvString=csvString
        for key in kwargs:
            setattr(self,key,kwargs[key])

class ChanVal1(object):
    def __init__(self, csvString=None,**kwargs):
        if csvString is not None:
            self.parseFromCsv(csvString)
        for key in kwargs:
                setattr(self,key,kwargs[key])

    def parseFromCsv(self, csvString):
        self.lst = csvString.split(',')

class ChanVal2(object):
    def __init__(self, csvString=None,**kwargs):
        if csvString is not None:
            self.parseFromCsv(csvString)
        for key in kwargs:
                setattr(self,key,kwargs[key])

    def parseFromCsv(self, csvString):
        lst = csvString.split(',')
        self.eventTime=lst[1]
        self.eventTimeExact=long(lst[2])
        self.other_clock=lst[3]


class ChanVal3(object):
    splitter=re.compile("[^,]*,(?P<eventTime>[^,]*),(?P<eventTimeExact>[^,]*),(?P<other_clock>[^,]*)")
    def __init__(self, csvString=None,**kwargs):
        if csvString is not None:
            self.parseFromCsv(csvString)
        self.__dict__.update(kwargs)

    def parseFromCsv(self, csvString):
        self.__dict__.update(self.splitter.match(csvString).groupdict())


s="chan,2007-07-13T23:24:40.143,0,0188878425-079,0,0,True,S-4001,UNSIGNED_INT,name1,module1"
RUNS=100000

for cls in ChanVal0, ChanVal1, ChanVal2, ChanVal3:
    start_time = time()
    for i in xrange(RUNS):
        cls(s)
    print "%s took %s seconds"%(cls.__name__, time()-start_time) 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...