Запустите Goroutines на отдельных процессах (многопроцессорность) - PullRequest
0 голосов
/ 29 июня 2018

В настоящее время у меня есть код MQTT, который может подписаться на тему, распечатать полученные сообщения, а затем опубликовать дальнейшие инструкции для новой темы. подписка / печать завершается в одном Goroutine, а публикация выполняется в другом Goroutine. Вот мой код:

var wg, pg sync.WaitGroup
// All messages are handled here - printing published messages and publishing new messages
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {

wg.Add(1)
pg.Add(1)
    go func() {
        defer wg.Done()
        fmt.Printf("%s\n", msg.Payload())
        //fmt.Println(os.Getpid())
    }()
go func(){
    defer pg.Done()
    message := ""
    //Changing configurations
    if strings.Contains(string(msg.Payload()), "arduinoLED") == true {
        message = fmt.Sprintf("change configuration")
    }
    if  strings.Contains(string(msg.Payload()), "NAME CHANGED") == true{
        message = fmt.Sprintf("change back")
    }
    // Publish further instructions to "sensor/instruction"
    token := client.Publish("sensor/instruction", 0, false, message)
    //fmt.Println(os.Getpid())
    token.Wait()

}()
}

func main() {

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)

    opts := MQTT.NewClientOptions().AddBroker("tcp://test.mosquitto.org:1883")

    opts.SetDefaultPublishHandler(f)
    // Topic to subscribe to for sensor data
    topic := "sensor/data"

    opts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic, 0, f); token.Wait() && token.Error() != nil {
            panic(token.Error())
        }
    }
    // Creating new client
    client := MQTT.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    } else {
        fmt.Printf("Connected to server\n")
    }
    wg.Wait()
    pg.Wait()
    <-c
}

Закомментированная строка os.Getpid() должна проверить, на каком процессе я запускаю тот Goroutine. Прямо сейчас они отображают один и тот же номер (что означает, что оба работают в одном и том же процессе?).

Мой вопрос: Как я могу запустить эти две подпрограммы в отдельных процессах ? Есть ли способ?

Редактировать: Если это невозможно, я хочу написать этот код, используя channel . Вот код, который у меня есть для этого:

var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    sensorData := make(chan []byte)
wg.Add(1)
pg.Add(1)
    go func() {
        defer wg.Done()
        //fmt.Printf("%s\n", msg.Payload())
        sensorData <- string(msg.Payload())
        fmt.Println(<-sensorData) //currently not printing anything
    }()
go func(){
    defer pg.Done()
    message := ""
    //Changing configurations
    if strings.Contains(<-sensorData, "arduinoLED") == true{
        message = fmt.Sprintf("change configuration")
    }
    if strings.Contains(<-sensorData, "NAME CHANGED") == true{
        message = fmt.Sprintf("change back")
    }
    // Publish further instructions to "sensor/instruction"
    token := client.Publish("sensor/instruction", 0, false, message)
    token.Wait()

}()

}

Однако я не могу распечатать какие-либо данные с использованием каналов. Что я делаю не так?

Ответы [ 2 ]

0 голосов
/ 29 июня 2018

Возможно, вы пришли из Python, верно? ; -)

имеет модуль с именем multiprocessing в его stdlib, и это вполне может объяснить, почему вы использовали это имя в заголовке вашего вопроса и почему вы, по-видимому испытывают затруднения при интерпретации того, что @JimB имел в виду, говоря

Если вам нужен отдельный процесс, вам нужно выполнить его самостоятельно

«Многопроцессорная обработка» в Python

Дело в том, что Python multiprocessing довольно высокого уровня вещь, которая скрывает под капотом много вещей. Когда вы создаете multiprocessing.Process и запускаете его функция, что действительно происходит, это:

  1. Интерпретатор Python создает другую операционную систему процесс (используя fork(2) в Unix-подобных системах или CreateProcess на Windows) и организует чтобы он тоже выполнял интерпретатор Python.

    Важнейшим моментом является то, что теперь у вас будет два процесса запуск двух интерпретаторов Python.

  2. Это устроено для этого интерпретатора Python в дочерний процесс имеет способ общаться с Python интерпретатор в родительском процессе.

    Эта "линия связи" обязательно включает в себя некоторую форму IPC @JimB. Другого способа передачи данных и действий просто нет между отдельными процессами именно потому, что товар современная ОС обеспечивает строгое разделение процессов.

  3. Когда вы обмениваетесь объектами Python между процессами , два взаимодействующих Python переводчики сериализуют и десериализуют их за вашей спиной перед отправкой по IPC-ссылке и после получения их оттуда соответственно. Это реализовано с использованием модуля pickle.

Вернуться к ходу

Go не имеет прямого решения, которое бы соответствует Python multiprocessing, и я действительно сомневаюсь, что это могло были разумно реализованы.

Основная причина этого в основном связана с тем, что Go это более низкий уровень, чем Python, и, следовательно, это не так есть роскошь Python делать чисто предположения о типы ценностей, которыми он управляет, и он также стремится иметь как можно меньше скрытых затрат в его конструкции.

Go также старается держаться подальше от подходов в стиле фреймворка решать проблемы и использовать «библиотечные» решения, когда возможный. (Хорошее краткое изложение "рамки против библиотеки" дается, например, здесь .) Go имеет все в стандартной библиотеке для реализации что-то похожее на Python multiprocessing, но нет готовое frakework-y решение для этого.

Итак, что вы могли бы сделать для этого, это прокатиться по следующим направлениям:

  1. Используйте os/exec для запуска другой копии вашего собственного процесса.

    • Убедитесь, что порожденный процесс "знает", что он запущен в особом «рабском» режиме - действовать соответственно.
    • Используйте любую форму IPC для связи с новым процессом. Обмен данными через стандартные потоки ввода / вывода детского процесса предположительно самый простой способ бросить (кроме случаев, когда вам нужно обменять открытые файлы, но это хадерная тема, поэтому не будем отвлекаться).
  2. Использовать любой пакет в иерархии encoding/ для сериализации и десериализовать данные при обмене.

    Предполагаемое решение для перехода - encoding/gob.

  3. Придумайте и реализуйте простой протокол, чтобы сообщить дочерний процесс, что делать и с какими данными, и как сообщить результаты мастеру.

Неужели это стоит того?

Я бы сказал, что нет, это не так по ряду причин:

  • Go не имеет ничего общего с ужасом GIL , так что нет необходимости обходить его, чтобы достичь реального параллелизма когда это естественно возможно.

  • Безопасность памяти - все в ваших руках, и достижение этого - не так уж сложно, когда вы покорно подчиняетесь принципу тот то, что отправлено по каналу, теперь принадлежит получатель. Другими словами, отправка значений по каналу также передача прав собственности на эти ценности.

  • В набор инструментов Go встроен детектор гонки, поэтому вы может запустить ваш набор тестов с флагом -race и создать оценку сборки вашей программы, использующие go build -race для того же цель: когда программа, инструментированная таким образом, запускается, детектор расы разбивает его, как только обнаруживает несинхронизированный доступ к памяти чтения / записи. Распечатка, полученная в результате этого сбоя, включает пояснительные сообщения о том, что, и где пошло не так, со следами стека.

  • IPC медленный, поэтому выигрыш вполне может быть компенсирован потерями.

В общем, я не вижу реальной причины отделять процессы, если только вы пишете что-то вроде сервера обработки электронной почты где эта концепция приходит естественно.

0 голосов
/ 29 июня 2018

Канал используется для связи между программами, вы не должны использовать его в той же программе, как этот код:

sensorData <- string(msg.Payload())
fmt.Println(<-sensorData) //currently not printing anything

Если вы хотите проверить печать по каналу, вы можете использовать буферизованный канал в той же программе, чтобы избежать блокировки, например:

sensorData := make(chan []byte, 1)

Приветствия

...