Обработка очереди Redis с использованием BLPOP вызывает состояние гонки в модульных тестах? - PullRequest
1 голос
/ 31 марта 2020

Я пытаюсь реализовать очередь задач «первым пришел - первым вышел», как описано в главе 6.4.1 электронной книги Redis в Go. В целях тестирования я передаю CommandExecutor интерфейс функции 'работника' следующим образом:

package service

import (
    "context"

    "github.com/gomodule/redigo/redis"
    "github.com/pkg/errors"
    "github.com/sirupsen/logrus"
)

const commandsQueue = "queuedCommands:"

var pool = redis.Pool{
    MaxIdle:   50,
    MaxActive: 1000,
    Dial: func() (redis.Conn, error) {
        conn, err := redis.Dial("tcp", ":6379")
        if err != nil {
            logrus.WithError(err).Fatal("initialize Redis pool")
        }
        return conn, err
    },
}

// CommandExecutor executes a command
type CommandExecutor interface {
    Execute(string) error
}

func processQueue(ctx context.Context, done chan<- struct{}, executor CommandExecutor) error {
    rc := pool.Get()
    defer rc.Close()

    for {
        select {
        case <-ctx.Done():
            done <- struct{}{}
            return nil
        default:
            // If the commands queue does not exist, BLPOP blocks until another client
            // performs an LPUSH or RPUSH against it. The timeout argument of zero is
            // used to block indefinitely.
            reply, err := redis.Strings(rc.Do("BLPOP", commandsQueue, 0))
            if err != nil {
                logrus.WithError(err).Errorf("BLPOP %s %d", commandsQueue, 0)
                return errors.Wrapf(err, "BLPOP %s %d", commandsQueue, 0)
            }

            if len(reply) < 2 {
                logrus.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
                return errors.Errorf("Expected a reply of length 2, got one of length %d", len(reply))
            }

            // BLPOP returns a two-element multi-bulk with the first element being the
            // name of the key where an element was popped and the second element
            // being the value of the popped element (cf. https://redis.io/commands/blpop#return-value)
            if err := executor.Execute(reply[1]); err != nil {
                return errors.Wrapf(err, "execute scheduled command: %s", reply[0])
            }
            done <- struct{}{}
        }
    }
}

Я создал небольшой репозиторий, https://github.com/kurtpeek/process-queue , с этим кодом, а также попытка юнит-тестов. Для модульного теста у меня есть два одинаковых теста (с разными именами):

package service

import (
    "context"
    "testing"

    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

func TestProcessQueue(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    executor := &CommandExecutorMock{
        ExecuteFunc: func(string) error {
            return nil
        },
    }

    done := make(chan struct{})
    go processQueue(ctx, done, executor)

    rc := pool.Get()
    defer rc.Close()

    _, err := rc.Do("RPUSH", commandsQueue, "foobar")
    require.NoError(t, err)

    <-done

    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}

func TestProcessQueue2(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    executor := &CommandExecutorMock{
        ExecuteFunc: func(string) error {
            return nil
        },
    }

    done := make(chan struct{})
    go processQueue(ctx, done, executor)

    rc := pool.Get()
    defer rc.Close()

    _, err := rc.Do("RPUSH", commandsQueue, "foobar")
    require.NoError(t, err)

    <-done

    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}

, где CommandExecutorMock генерируется с использованием moq. Если я запускаю каждый тест по отдельности, они проходят:

~/g/s/g/k/process-queue> go test ./... -v -run TestProcessQueue2
=== RUN   TestProcessQueue2
--- PASS: TestProcessQueue2 (0.00s)
PASS
ok      github.com/kurtpeek/process-queue/service   0.243s

Однако, если я запускаю все тесты, второй раз истекает:

~/g/s/g/k/process-queue> 
go test ./... -v -timeout 10s
=== RUN   TestProcessQueue
--- PASS: TestProcessQueue (0.00s)
=== RUN   TestProcessQueue2
panic: test timed out after 10s

Кажется, что при втором тесте выполняется, программа, запущенная в первом тесте, все еще выполняется и BLPOP вводит команду из очереди, так что строка <-done во втором тесте блокируется неопределенно долго. Это несмотря на вызов cancel() в родительском контексте первого теста.

Как я могу «изолировать» эти тесты, чтобы они оба проходили при выполнении вместе? (Я пытался передать флаг -p 1 на go test, но безрезультатно).

1 Ответ

1 голос
/ 31 марта 2020

Это несмотря на вызов метода cancel () в родительском контексте первого теста.

Между записью в done и вызовом cancel() существует некоторое время, что означает, что первый тест может (и делает) ввести вторую итерацию for/select вместо выхода на <-ctx.Done(). Более конкретно, тестовый код включает в себя 2 подтверждения перед отменой:

    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)

Только тогда начинает действовать defer cancel(), что представляется слишком поздно для отмены контекста в первой подпрограмме go.

Если вы переместите вызов cancel() непосредственно перед чтением с done, тесты пройдут:

func TestProcessQueue(t *testing.T) {
    ctx, cancel := context.WithCancel(context.Background())

    executor := &CommandExecutorMock{
        ExecuteFunc: func(string) error {
            return nil
        },
    }

    done := make(chan struct{})
    go processQueue(ctx, done, executor)

    rc := pool.Get()
    defer rc.Close()

    _, err := rc.Do("RPUSH", commandsQueue, "foobar")
    require.NoError(t, err)

    cancel() // note this change right here
    <-done

    assert.Exactly(t, 1, len(executor.ExecuteCalls()))
    assert.Exactly(t, "foobar", executor.ExecuteCalls()[0].In1)
}
...