Быстрая связь IPC / Socket в Java / Python - PullRequest
7 голосов
/ 12 февраля 2012

Два приложения (Java и Python) должны взаимодействовать в моем приложении. Я заметил, что связь через сокет занимает 93% времени выполнения. Почему общение так медленно? Стоит ли искать альтернативы сокетной коммуникации или это можно сделать быстрее?

Обновление: я обнаружил простое исправление. Похоже, что выходной поток с буферизацией на самом деле не буферизован по неизвестной причине. Итак, теперь я помещаю все данные в строковые буферы в обоих клиент-серверных процессах. Я пишу это в сокет в методе сброса.

Мне все еще интересен пример использования разделяемой памяти для быстрого обмена данными между процессами.

Некоторая дополнительная информация:

  1. Размер сообщения в приложении в большинстве случаев не превышает 64 КБ.
  2. Сервер на Java, клиент написан на Python.
  3. Сокет IPC реализован ниже: для отправки 200 байтов требуется 50 циклов! Это должно быть слишком высоко. Если я отправлю 2 байта за 5000 циклов, это займет намного меньше времени.
  4. Оба процесса выполняются на одном компьютере с Linux.
  5. В реальном приложении каждый цикл совершает около 10 вызовов iFid.write () клиента.
  6. Это делается в системе Linux.

Это сторона сервера:

public class FastIPC{
    public PrintWriter out;
    BufferedReader in;
    Socket socket = null;
    ServerSocket serverSocket = null;


    public FastIPC(int port) throws Exception{
        serverSocket = new ServerSocket(port);
        socket = serverSocket.accept();
        out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
        in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    }

    public void send(String msg){
        out.println(msg); // send price update to socket
    }

    public void flush(){
        out.flush();
    }

    public String recv() throws Exception{
        return in.readLine();
    }

    public static void main(String[] args){
        int port = 32000;
        try{
            FastIPC fip = new FastIPC(port);
            long start = new Date().getTime();
            System.out.println("Connected.");
            for (int i=0; i<50; i++){
                for(int j=0; j<100; j++)
                    fip.send("+");
                fip.send(".");
                fip.flush();
                String msg = fip.recv();
            }
            long stop = new Date().getTime();
            System.out.println((double)(stop - start)/1000.);
        }catch(Exception e){
            System.exit(1);
        }
    }
}

А на стороне клиента:

import sys
import socket

class IPC(object):
    def __init__(self):
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.s.connect(("localhost", 32000))
        self.fid = self.s.makefile() # file wrapper to read lines
        self.listenLoop() # wait listening for updates from server

    def listenLoop(self):
        fid = self.fid
        print "connected"
        while True:
            while True:
                line = fid.readline()
                if line[0]=='.':
                    break
            fid.write('.\n')
            fid.flush()

if __name__ == '__main__':
    st = IPC()

Ответы [ 2 ]

11 голосов
/ 12 февраля 2012

У вас есть несколько вариантов.Поскольку вы используете Linux, вы можете использовать доменные сокеты UNIX.Или вы можете сериализовать данные как ASCII или JSon или какой-либо другой формат и передать их через канал, SHM (сегмент совместно используемой памяти), очередь сообщений, DBUS или подобное.Стоит подумать о том, какие у вас есть данные, так как эти механизмы IPC имеют разные характеристики производительности.Есть черновик документа USENIX с хорошим анализом различных компромиссов, который стоит прочитать.

Поскольку вы говорите (в комментариях к этому ответу), что предпочитаете использовать SHM,Вот несколько примеров кода, с которых можно начать.Использование библиотеки Python posix_ipc :

import posix_ipc # POSIX-specific IPC
import mmap      # From Python stdlib

class SharedMemory(object):
    """Python interface to shared memory. 
    The create argument tells the object to create a new SHM object,
    rather than attaching to an existing one.
    """

    def __init__(self, name, size=posix_ipc.PAGE_SIZE, create=True):
        self.name = name
        self.size = size
        if create:
            memory = posix_ipc.SharedMemory(self.name, posix_ipc.O_CREX,
                                            size=self.size)
        else:
            memory = posix_ipc.SharedMemory(self.name)
        self.mapfile = mmap.mmap(memory.fd, memory.size)
        os.close(memory.fd)
        return

    def put(self, item):
        """Put item in shared memory.
        """
        # TODO: Deal with the case where len(item) > size(self.mapfile)
        # TODO: Guard this method with a named semaphore
        self.mapfile.seek(0)
        pickle.dump(item, self.mapfile, protocol=2)
        return

    def get(self):
        """Get a Python object from shared memory.
        """
        # TODO: Deal with the case where len(item) > size(self.mapfile)
        # TODO: Guard this method with a named semaphore
        self.mapfile.seek(0)
        return pickle.load(self.mapfile)

    def __del__(self):
        try:
            self.mapfile.close()
            memory = posix_ipc.SharedMemory(self.name)
            memory.unlink()
        except:
            pass
        return    

Для стороны Java вы хотите создать тот же класс, несмотря на то, что я сказал в комментариях JTux , кажется, обеспечиваетЭквивалентная функциональность и необходимый вам API находятся в UPosixIPC классе.

Приведенный ниже код представляет собой схему того, что вам нужно реализовать.Однако, здесь не хватает нескольких вещей - обработка исключений очевидна, также есть некоторые флаги (найдите их в UConstant ), и вы захотите добавить семафор для защиты put / get методы.Однако это должно поставить вас на правильный путь.Помните, что mmap или отображенный в памяти файл - это файловый интерфейс с сегментом ОЗУ.Таким образом, вы можете использовать его файловый дескриптор, как если бы он был fd обычного файла.

import jtux.*;

class SHM {

    private String name;
    private int size;
    private long semaphore;
    private long mapfile; // File descriptor for mmap file

    /* Lookup flags and perms in your system docs */
    public SHM(String name, int size, boolean create, int flags, int perms) {
        this.name = name;
        this.size = size;
        int shm;
        if (create) {
            flags = flags | UConstant.O_CREAT;
            shm = UPosixIPC.shm_open(name, flags, UConstant.O_RDWR);
        } else {
            shm = UPosixIPC.shm_open(name, flags, UConstant.O_RDWR);
        }
        this.mapfile = UPosixIPC.mmap(..., this.size, ..., flags, shm, 0);
        return;
    }


    public void put(String item) {
        UFile.lseek(this.mapfile(this.mapfile, 0, 0));
        UFile.write(item.getBytes(), this.mapfile);
        return;
    }


    public String get() {    
        UFile.lseek(this.mapfile(this.mapfile, 0, 0));
        byte[] buffer = new byte[this.size];
        UFile.read(this.mapfile, buffer, buffer.length);
        return new String(buffer);
    }


    public void finalize() {
        UPosix.shm_unlink(this.name);
        UPosix.munmap(this.mapfile, this.size);
    }

}
1 голос
/ 13 февраля 2012

Некоторые мысли

  • Сервер на Java, клиент написан на Python.

Странная комбинация, но есть ли причина, по которой один не может вызвать другойотправка через стандартный ввод, стандартный вывод?

  • Сокет IPC реализован ниже: требуется 50 циклов отправки 200 байтов!Это должно быть слишком высоко.Если я отправлю 2 байта за 5000 циклов, это займет намного меньше времени.

Любой вызов ОС будет относительно медленным (задержка).Использование разделяемой памяти может обойти ядро.Если у вас проблема с пропускной способностью, я обнаружил, что вы можете достичь 1-2 ГБ / с с помощью сокетов, если задержка не является для вас проблемой.

  • Оба процесса выполняются на одном компьютере с Linux.

Идеально подходит для совместно используемой памяти.

  • В реальном приложении каждый цикл выполняет около 10 вызовов iFid.write () клиента.

Неуверен, почему это так.Почему бы не создать единую структуру / буфер и написать ее один раз.Я бы использовал прямой буфер для минимизации задержек.Использование перевода символов довольно дорого, особенно если вам нужен только ASCII.

  • Это делается в системе Linux.

Должно быть легко оптимизировать.

Я использую общую память через файлы с отображением в памяти.Это потому, что мне нужно записывать каждое сообщение для целей аудита.Я получаю среднюю задержку около 180 нс в обоих направлениях, поддерживаемую для миллионов сообщений, и около 490 нс в реальном приложении.

Одним из преимуществ этого подхода является то, что при небольших задержках читатель может наверстать упущенноеочень быстро с писателем.Он также легко поддерживает перезапуск и репликацию.

Это реализовано только в Java, но принцип достаточно прост, и я уверен, что он будет работать и в python.

https://github.com/peter-lawrey/Java-Chronicle

...