Go: использование gob с zmq4 - PullRequest
       13

Go: использование gob с zmq4

1 голос
/ 25 апреля 2020

Я пытаюсь использовать gob с zmq4 сокетом (из pebbe's zmq4). Сокет zmq4 не имеет устройства ввода-вывода, что делает невозможным прямое чтение / запись гобом:

Я не могу использовать &client ( type **zmq4.Socket ) в качестве типа io.Writer в аргументе gob.NewEncoder: zmq4.Socket не реализует io.Writer (отсутствует метод записи)

Одна zmq4 функция отправки, SendMessage(), принимает interface{}, поэтому я использую ее для отправки.

На стороне сервера zmq4 функции получения возвращают либо string, []byte, []string, либо [][]byte. Я использую RecvMessage(), который возвращает []string.

Было бы нормально записать в bytes.Buffer, отправить этот буфер, прочитать его как []string, затем взять часть содержимого сообщения для обработки с gob. Хотя текущая проблема заключается в преобразовании этого []string в bytes.Buffer, чтобы гоб смог получить от него io.Read. Звучит как базис c, но я пока пробовал много подходов без успеха. Вот текущий. Очевидно, проблема в том, что gob выдает « дополнительные данные в буфере », когда данные перед отправкой и после получения кажутся одинаковыми, как показывают операторы print.

Есть ли проще, более go -об этом? Если у вас zmq4, приведенный ниже код является автономным и должен выполняться.

package main

import (
    "bytes"
    "encoding/gob"
    "fmt"
    "sync"
    "time"

    zmq "github.com/pebbe/zmq4"
)

type LogEntry struct {
    ErrID  int
    Name   string
    Level  string
    LogStr string
}

// The client task
// --------------------------------------------------------------
func client_task(s string) {
    var mu sync.Mutex
    client, _ := zmq.NewSocket(zmq.DEALER)
    defer client.Close()
    client.SetIdentity(s)
    client.Connect("tcp://localhost:5570")

    go func() {

        aLogEntry := &LogEntry{
            ErrID:  1,
            Name:   "Client",
            LogStr: "Log msg sent",
        }

        for request_nbr := 1; true; request_nbr++ {

            var network bytes.Buffer
            enc := gob.NewEncoder(&network)
            err := enc.Encode(aLogEntry)
            if err != nil {
                fmt.Println("encode error:", err)
            }

            // Early decode test - this will influence subsequent gob
            // behaviour so leave commented when caring about the sent
            // data
            // dec := gob.NewDecoder(&network)
            // var aLogEntry2 *LogEntry
            // err = dec.Decode(&aLogEntry2)
            // if err != nil {
            //  fmt.Printf("client_task(DECODE ERROR) : %s\n\n", err)
            // }
            // fmt.Printf("client_task(TEST DECODE) %+v\n\n", aLogEntry)

            mu.Lock()
            // Replaced length by bytes Buffer method: 91
            fmt.Printf("client_task(len) : %d\n\n", network.Len())
            fmt.Printf("client_task(network) : %v\n\n", network)

            client.SendMessage(network, 0)
            mu.Unlock()
            time.Sleep(5 * time.Second)
        }
    }()

    // pause to allow server
    for {
        time.Sleep(100 * time.Millisecond)
    }
}

// The server task
// --------------------------------------------------------------
func server_task() {

    frontend, _ := zmq.NewSocket(zmq.ROUTER)
    defer frontend.Close()
    frontend.Bind("tcp://*:5570")

    for {
        msg, _ := frontend.RecvMessage(0)
        // Added error reporting - does not report any error
        // err does never get filled in here, never an error could get reported here
        if err != nil {
            fmt.Printf("RECV ERROR: %s", err)
        }

        // using WriteString to write the content portion of the
        // received message to the bytes.Buffer for gob to process
        var network bytes.Buffer
        dec := gob.NewDecoder(&network)
        network.WriteString(msg[1])

        // Added length of the bytes Buffer: 285
        // Before sending the bytes Buffer is: 91
        // More than just msg[1] is written into the buffer ?
        fmt.Printf("server_task(len): %d\n\n", network.Len())
        fmt.Printf("server_task(msg[1]) : %s\n\n", msg[1])

        var aLogEntry *LogEntry
        err := dec.Decode(&aLogEntry)
        if err != nil {
            fmt.Printf("server_task(DECODE ERROR) : %s\n\n", err)
        }

        fmt.Printf("server_task(aLogEntry) %+v\n\n", aLogEntry)
    }
}

func main() {
    defer fmt.Println("main() done")

    go client_task("1")
    go server_task()

    //  Run for 5 seconds then quit
    time.Sleep(5 * time.Second)
}

Операторы печати показывают на стороне клиента:

client_task(network) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

На сервере сторона:

server_task(msg[1]) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

Что выглядит примерно так же.

Результаты:

server_task(DECODE ERROR) : extra data in buffer
server_task(aLogEntry) <nil>

1 Ответ

0 голосов
/ 26 апреля 2020

Добро пожаловать в Искусство Zen-of-Zero:


В случае, если вы никогда не работали с ZeroMQ ,
, здесь можно насладиться первым посмотрите "ZeroMQ Принципы менее чем за Пять секунд "
, прежде чем углубляться в дальнейшие детали



НАБЛЮДЕНИЕ:

Собственный API ZeroMQ определяет это свойство:

При получении сообщений сокет ZMQ_ROUTER должен предварять часть сообщения содержит идентификатор маршрутизации инициирующего однорангового узла для сообщения перед передачей его приложению .


РЕШЕНИЕ:

Любой
может начать использовать достаточно правильный PUSH/PULL Масштабируемый формальный архетип вместо (слишком сложного для вашего варианта использования) DEALER/ROUTER,
или
может полагаться на ваше предположение ROUTER -узел никогда не будет .RecvMessage( 0 ) с другой внутренней multipart-структурой, кроме единственной, соответствующей шаблону
[ <routing_id> | <[network]-payload> [ | ... ] ]
w который не может быть надежно гарантирован, не так ли?

Пока не уверен, как pebbe s zmq4 go -оболочка для ZeroMQ пытается или не реализует весь нативный API функции и / или обрабатывает потенциальные различия в (автоматизировано без вмешательства пользователя на уровне приложения? ) чтение всех компонентов, состоящих из нескольких частей, и обработка строк с однократной и / или NULL-обработкой, которые могут конфликтовать внутри .Decode() -метода.

Последнее, но не менее важное, если ваш код полагается на msg[1], чтобы иметь действительную полезную нагрузку, соответствующую всем соглашениям, внутри, если я не пропускаю какой-то низкоуровневый хак, я не знаю, я не вижу явной обработки случаев, когда инициирующая сторона (DEALER ) не доставил ни одного такого нового сообщения на сторону потребителя (ROUTER), но .RecvMessage( 0 ) -метод заполняет msg и переходит (с пустым msg) к методу .Decode(), где он должен по понятным причинам потерпеть неудачу на пустом или плохо сформированном msg, не так ли?

Я бы определенно начал с замены PUSH/PULL, он не будет вставлять предварительно вставленную, теперь многокадровую композицию на сторона доставки, с routing_id и связанными с этим рисками.

Режим без блокировки возвращается из .RecvMessage() -метод все равно должен конфликтовать при заполнении msg пустыми данными, если ожидающие сообщения не ожидают внутри t Он PULL -RxQueue-buffers, который по-прежнему должен обрабатывать c .Decode() -метод.

В случае, если вызов .RecvMessage( 0 ) -метода фактически показывает прием в режиме блокировки, следует принять больше внимания уделяется обнаружению и обработке состояния ошибки, если ZeroMQ должен быть полностью исключен из анализа причины ошибки root. Более самозащитные .setsockopt() -установки (ZMQ_LINGER и многие другие) для всех развернутых ресурсов ZeroMQ также улучшат надежность и уровень подверженности ошибкам, а именно в случаях, когда сбойное приложение может причинить какой-либо вред production.

Вы можете попытаться воспроизвести ошибку: здесь IDE, к сожалению, пропускает часть ZeroMQ.

Golang .org site не удалось либо:

go: finding module for package github.com/pebbe/zmq4
go: downloading github.com/pebbe/zmq4 v1.2.0
go: found github.com/pebbe/zmq4 in github.com/pebbe/zmq4 v1.2.0
# pkg-config --cflags  -- libzmq
pkg-config: exec: "pkg-config": executable file not found in $PATH

Go build failed.
...