сброс таймера в отдельной go рутине - PullRequest
0 голосов
/ 29 февраля 2020

В следующем сценарии сетевой объект всегда ждет TimeOutTime секунд, прежде чем выполнить определенную задачу X. Предположим, это время как TimerT. В течение этого ожидания, равного TimeOutTime секундам, если объект получает набор внешних сообщений, он должен снова сбросить те же значения TimerT до TimeOutTime. Если внешние сообщения не получены, ожидаемое поведение выглядит следующим образом:

  1. Срок действия таймера истек
  2. Выполните задачу X
  3. Сбросьте таймер еще раз до TimeOutTime

(под reset Я имею в виду, остановите таймер и начните все сначала)

enter image description here

Для имитации сценария, который я написал следующий код в Go.

package main

import (
    "log"
    "math/rand"
    "sync"
    "time"
)

const TimeOutTime = 3
const MeanArrivalTime = 4

func main() {
    rand.Seed(time.Now().UTC().UnixNano())
    var wg sync.WaitGroup
    t := time.NewTimer(time.Second * time.Duration(TimeOutTime))
    wg.Add(1)
    // go routine for doing timeout event
    go func() {
        defer wg.Done()
        for {
            t1 := time.Now()
            <-t.C
            t2 := time.Now()
            // Do.. task X .. on timeout...
            log.Println("Timeout after ", t2.Sub(t1))
            t.Reset(time.Second * time.Duration(TimeOutTime))
        }
    }()

    // go routine to simulate incoming messages ...
    // second go routine
    go func() {
        for {
            // simulates a incoming message at any time
            time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))

            // once any message is received reset the timer to TimeOutTime seconds again
            t.Reset(time.Second * time.Duration(TimeOutTime))
        }
    }()

    wg.Wait()
}

После запуска этой программы с использованием флага -race она показывает DATA_RACE:

==================
WARNING: DATA RACE
Write at 0x00c0000c2068 by goroutine 8:
  time.(*Timer).Reset()
      /usr/local/go/src/time/sleep.go:125 +0x98
  main.main.func1()
      /home/deka/Academic/go/src/main/test.go:29 +0x18f

Previous write at 0x00c0000c2068 by goroutine 9:
  time.(*Timer).Reset()
      /usr/local/go/src/time/sleep.go:125 +0x98
  main.main.func2()
      /home/deka/Academic/go/src/main/test.go:42 +0x80

Goroutine 8 (running) created at:
  main.main()
      /home/deka/Academic/go/src/main/test.go:20 +0x1d3

Goroutine 9 (running) created at:
  main.main()
      /home/deka/Academic/go/src/main/test.go:35 +0x1f5
==================

Затем я использовал Mutex, чтобы обернуть Reset() вызов внутри Mutex.

основной пакет

import (
    "log"
    "math/rand"
    "sync"
    "time"
)

const TimeOutTime = 3
const MeanArrivalTime = 4

func main() {
    rand.Seed(time.Now().UTC().UnixNano())
    var wg sync.WaitGroup
    t := time.NewTimer(time.Second * time.Duration(TimeOutTime))
    wg.Add(1)
    var mu sync.Mutex
    // go routine for doing timeout event
    go func() {
        defer wg.Done()
        for {
            t1 := time.Now()
            <-t.C
            t2 := time.Now()
            // Do.. task X .. on timeout...
            log.Println("Timeout after ", t2.Sub(t1))
            mu.Lock()
            t.Reset(time.Second * time.Duration(TimeOutTime))
            mu.Unlock()
        }
    }()

    // go routine to simulate incoming messages ...
    // second go routine
    go func() {
        for {
            // simulates a incoming message at any time
            time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))

            // once any message is received reset the timer to TimeOutTime seconds again
            mu.Lock()
            t.Reset(time.Second * time.Duration(TimeOutTime))
            mu.Unlock()
        }
    }()

    wg.Wait()
}

После этого код, кажется, работает нормально, исходя из следующих наблюдений.

Если я заменю строку

time.Sleep(time.Second * time.Duration(rand.Intn(MeanArrivalTime)))

во второй программе go с постоянным временем сна 4 seconds и TimeOutTime постоянным при 3 seconds.

Выходной сигнал Программа:

2020/02/29 20:10:11 Timeout after  3.000160828s
2020/02/29 20:10:15 Timeout after  4.000444017s
2020/02/29 20:10:19 Timeout after  4.000454657s
2020/02/29 20:10:23 Timeout after  4.000304877s

В приведенном выше выполнении подпрограмма 2nd go сбрасывает active timer после того, как таймер провел первоначальную одну секунду. Из-за чего срок действия timer истекает через 4 секунд со второго отпечатка и далее.

Теперь, когда я проверил документацию Reset(), я обнаружил следующее:

// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.



// Reset changes the timer to expire after duration d.
// It returns true if the timer had been active, false if the timer had
// expired or been stopped.
//
// Reset should be invoked only on stopped or expired timers with drained channels.
// If a program has already received a value from t.C, the timer is known
// to have expired and the channel drained, so t.Reset can be used directly.
// If a program has not yet received a value from t.C, however,
// the timer must be stopped and—if Stop reports that the timer expired
// before being stopped—the channel explicitly drained:
//
//  if !t.Stop() {
//      <-t.C
//  }
//  t.Reset(d)
//
// This should not be done concurrent to other receives from the Timer's
// channel.
//
// Note that it is not possible to use Reset's return value correctly, as there
// is a race condition between draining the channel and the new timer expiring.
// Reset should always be invoked on stopped or expired channels, as described above.
// The return value exists to preserve compatibility with existing programs.

Я нашел эту диаграмму: (ссылка: https://blogtitle.github.io/go-advanced-concurrency-patterns-part-2-timers/)

Time Diagram GoLang

Имея в виду биграмму, кажется, что мне нужно использовать,

if !t.Stop() {
    <-t.C
}
t.Reset(d)

в 2nd go рутина. В этом случае мне также необходимо выполнить правильную блокировку в обеих подпрограммах go, чтобы избежать бесконечного ожидания на канале.

Я не понимаю сценарий, при котором следует выполнять t.Stop() + draining of the channel (<-t.C). В каком случае это требуется? В моем примере я не использую значения чтения канала. Могу ли я вызвать Reset () без вызова Stop ()?

Ответы [ 3 ]

1 голос
/ 29 февраля 2020

Я упростил код с помощью функции time.After:

package main

import (
    "log"
    "math/rand"
    "time"
)

const TimeOutTime = 3
const MeanArrivalTime = 4

func main() {
    const interval = time.Second * TimeOutTime
    // channel for incoming messages
    var incomeCh = make(chan struct{})

    go func() {
        for {
            // On each iteration new timer is created
            select {
            case <-time.After(interval):
                time.Sleep(time.Second)
                log.Println("Do task")
            case <-incomeCh:
                log.Println("Handle income message and move to the next iteration")
            }
        }
    }()

    go func() {
        for {
            time.Sleep(time.Duration(rand.Intn(MeanArrivalTime)) * time.Second)
            // generate incoming message
            incomeCh <- struct{}{}
        }
    }()

    // prevent main to stop for a while
    <-time.After(10 * time.Second)
}

Обратите внимание, что:

After ожидает истечения длительности, а затем отправляет текущее время на возвращенный канал. Это эквивалентно NewTimer(d).C. Базовый таймер не восстанавливается сборщиком мусора, пока не сработает таймер. Если важна эффективность, вместо этого используйте NewTimer и вызовите Timer.Stop, если таймер больше не нужен.

1 голос
/ 01 марта 2020

Вы можете рассмотреть другой общий дизайн.

Предположим, например, что мы пишем подпрограмму или интерфейс под названием Deadliner - он может стать вашим собственным пакетом, если хотите, или просто интерфейсом, и мы будем увидеть довольно сильное сходство с чем-то, что Go уже имеет - чья работа / контракт описывается следующим образом:

  • Пользователь Deadliner создает Deadline, когда ему нравится.
  • Deadliner ждет, пока не наступит крайний срок, затем помечает крайний срок как наступивший.
  • Deadliner может быть отменен любой подпрограммой Go в любое время. Это помечает мертвую линию как отмененную, так что любой, кто ее ожидает, перестанет ждать и может сказать, что причина, по которой он остановился, была "отменена" (не "истекла"). Он также помогает очистить ресурсы для g c, если вы создаете много Deadliners, а затем отбрасываете их до того, как истечет время их ожидания.

Теперь на вашем верхнем уровне, прежде чем вы начнете ждать сообщение, вы просто установите крайний срок. Это не таймер (даже если он может использовать его внутри), это просто экземпляр Deadliner. Затем вы ждете одно из двух событий:

d, cancel = newDeadline(when)
for {
    select {
    case <-d.Done():
          // Deadline expired.
          // ... handle it ...
          d, cancel = newDeadline(when) // if/as appropriate
    case m := <-msgC:
          // got message - cancel existing deadline and get new one
          cancel()
          d, cancel = newDeadline(when)
          // ... handle the message
    }
}

Теперь мы просто отметим, что Go уже имеет это: оно в пакете context. d является контекстом; newDeadline - это context.WithDeadline или context.WithTimeout (в зависимости от того, хотите ли вы самостоятельно рассчитать время предельного срока или код тайм-аута добавляет продолжительность к «сейчас»).

Не нужно возиться с таймерами и временными тиковыми каналами, и вам не нужно выделять свои собственные отдельные подпрограммы.

Если срок не сбрасывается для отдельного сообщения, а для определенной комбинации сообщений, вы просто пишете это в ваш case <-msgChan раздел. Если сообщения в настоящее время не принимаются по каналам, сделайте это, поместив сообщения в канал, чтобы вы могли использовать этот очень простой шаблон ожидания для крайнего срока или сообщения.

1 голос
/ 29 февраля 2020

Предположим, у вас есть:

t.Stop()
t.Reset()

Если таймер остановлен и истощен перед вызовом Stop, это работает нормально. Проблема проявляется, если Stop останавливает таймер и тики таймера одновременно. Тогда у вас может получиться остановленный таймер с программой, ожидающей записи на канал t.C. Таким образом, Stop возвращает false, если все еще существует программа, ожидающая записи в t.C, и вам нужно прочитать с нее. В противном случае у вас будет эта процедура, ожидающая там бесконечно.

Итак, как вы уже заметили, вы должны сделать:

if !t.Stop() {
    <-t.C
}
t.Reset(d)

Однако, даже с этим, я думаю, ваше решение недостатки из-за использования асинхронных сбросов. Вместо этого попробуйте использовать новый таймер для каждого смоделированного события.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...