Как обрабатывать буферизованные потоки чтения-записи для одноранговых узлов в golang с использованием libp2p? - PullRequest
1 голос
/ 25 марта 2019

Я следую этому уроку:

https://github.com/libp2p/go-libp2p-examples/tree/master/chat-with-mdns

В краткой форме это:

  1. настраивает хост p2p
  2. устанавливает функцию обработчика по умолчанию для входящих соединений (3. не обязательно)
  3. и открывает поток для подключающихся пиров:

stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

После этого создается переменная потока буфера / чтения-записи:

rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

Теперь этот поток используется для отправки и получения данных между узлами. Это делается с помощью двух функций goroutine, которые имеют rw в качестве ввода:

go writeData(rw) go readData(rw)

Мои проблемы:

  1. Я хочу отправить данные своим коллегам и хочу получить от них обратную связь: например в rw есть вопрос и они должны ответить да / нет. Как я могу вернуть этот ответ и обработать его (включить некоторое взаимодействие)?

  2. Данные, которые я хочу отправить в rw, не всегда одинаковы. Иногда это строка, содержащая только имя, иногда строка, содержащая целый блок и т. Д. Как я могу различить?

Я думал об этих решениях. Но я новичок в Голанге, так что, может быть, у вас есть лучший:

  • нужен ли мне новый поток для каждого другого контента: stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

  • нужно ли мне открывать больше буферизованных вариабельных значений для каждого различного контента: rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

  • есть ли другие решения?

Спасибо за любую помощь, чтобы решить эту проблему!

1 Ответ

0 голосов
/ 25 марта 2019

Вот что readData делает из вашего туто:

func readData(rw *bufio.ReadWriter) {
    for {
        str, err := rw.ReadString('\n')
        if err != nil {
            fmt.Println("Error reading from buffer")
            panic(err)
        }

        if str == "" {
            return
        }
        if str != "\n" {
            // Green console colour:    \x1b[32m
            // Reset console colour:    \x1b[0m
            fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
        }

    }
}

Он в основном читает поток, пока не находит \n, который является символом новой строки, и выводит его на стандартный вывод.

writeData:

func writeData(rw *bufio.ReadWriter) {
    stdReader := bufio.NewReader(os.Stdin)

    for {
        fmt.Print("> ")
        sendData, err := stdReader.ReadString('\n')
        if err != nil {
            fmt.Println("Error reading from stdin")
            panic(err)
        }

        _, err = rw.WriteString(fmt.Sprintf("%s\n", sendData))
        if err != nil {
            fmt.Println("Error writing to buffer")
            panic(err)
        }
        err = rw.Flush()
        if err != nil {
            fmt.Println("Error flushing buffer")
            panic(err)
        }
    }
}

Он читает данные из стандартного ввода, поэтому вы можете набирать сообщения, записывать их в rw и сбрасывать их. Этот вид позволяет своего рода tty чат. Если он работает правильно, вы сможете запустить хотя бы двух пиров и общаться через стандартный ввод.

Не следует создавать новый rw для нового контента. Вы можете повторно использовать существующий, пока не закроете его. Из кода туто, новый rw создается для каждого нового пира.


Теперь поток tcp не работает как HTTP-запрос с запросом и ответом, соответствующим этому запросу. Поэтому, если вы хотите что-то отправить и получить ответ на этот конкретный вопрос, вы можете отправить сообщение следующего формата:

[8 bytes unique ID][content of the message]\n

И когда вы получаете его, вы анализируете его, подготавливаете ответ и отправляете его в том же формате, чтобы вы могли сопоставлять сообщения, создавая своего рода связь запрос / ответ.

Вы можете сделать что-то подобное:

func sendMsg(rw *bufio.ReadWriter, id int64, content []byte) error {
        // allocate our slice of bytes with the correct size 4 + size of the message + 1
        msg := make([]byte, 4 + len(content) + 1)

        // write id 
        binary.LittleEndian.PutUint64(msg, uint64(id))

        // add content to msg
        copy(msg[13:], content)

        // add new line at the end
        msg[len(msg)-1] = '\n'

        // write msg to stream
        _, err = rw.Write(msg)
        if err != nil {
            fmt.Println("Error writing to buffer")
            return err
        }
        err = rw.Flush()
        if err != nil {
            fmt.Println("Error flushing buffer")
            return err
        }
        return nil
}

func readMsg(rw *bufio.ReadWriter) {
    for {
        // read bytes until new line
        msg, err := rw.ReadBytes('\n')
        if err != nil {
            fmt.Println("Error reading from buffer")
            continue
        }

        // get the id
        id := int64(binary.LittleEndian.Uint64(msg[0:8]))

        // get the content, last index is len(msg)-1 to remove the new line char
        content := string(msg[8:len(msg)-1])

        if content != "" {
            // we print [message ID] content
            fmt.Printf("[%d] %s", id, content)
        }

        // here you could parse your message
        // and prepare a response
        response, err := prepareResponse(content)
        if err != nil {
            fmt.Println("Err while preparing response: ", err)
            continue
        }

        if err := s.sendMsg(rw, id, response); err != nil {
            fmt.Println("Err while sending response: ", err)
            continue
        }
    }
}

Надеюсь, это поможет.

...