Отправка внешних событий в рабочий процесс - PullRequest
2 голосов
/ 12 июня 2019

В нашем рабочем процессе каденции нам часто нужно некоторое время ждать внешних событий, прежде чем продолжить (т. Е. Чтение электронной почты, нажатие ссылки и т. Д.).

Мне было интересно, как лучше всего уведомить наши рабочие процессы об этих событиях. Являются ли сигналы правильным способом, или мы должны создать действие, которое будет ждать события?

Из того, что я видел, нам нужно создать сигнальный канал ch := workflow.GetSignalChannel(ctx, SignalName), однако контекст недоступен в действиях.

1 Ответ

2 голосов
/ 13 июня 2019

Сигнализация является рекомендуемым способом отправки событий в рабочие процессы.

Часто используемый шаблон для рабочих процессов Go заключается в использовании селектора для ожидания нескольких сигнальных каналов, а также для таймера будущего.

Образец Go:

sig1Ch := workflow.GetSignalChannel(ctx, "signal1")
sig2Ch := workflow.GetSignalChannel(ctx, "signal2")
timeout := workflow.NewTimer(ctx, time.Minute * 30)

s := workflow.NewSelector(ctx)

var signal1 *Signal1Struct
var signal2 *Signal2Struct
s.AddFuture(timeout, func(f Future) {
})
s.AddReceive(sig1Ch, func(c Channel, more bool) {
    c.Receive(ctx, signal1)
})
s.AddReceive(sig2Ch, func(c Channel, more bool) {
    c.Receive(ctx, signal2)
})

s.Select(ctx)

if signal1 == nil && signal2 == nil {
   // handle timeout
} else {
  // process signals
}

Образец Java:

public interface MyWorkflow {

    @WorkflowMethod
    void main();

    @SignalMethod
    void signal1(Signal1Struct signal);

    @SignalMethod
    void signal2(Signal2Struct signal);

}

public class MyWorkflowImpl implements MyWorkflow {

    private Signal1Struct signal1;
    private Signal2Struct signal2;

    @Override
    public void main() {
        Workflow.await(Duration.ofMinutes(30), 
            () -> signal1 != null || signal2 != null);

        if (signal1 == null && signal2 == null) {
            // handle timeout
        }
        // process signals
    }

    @Override
    public void signal1(Signal1Struct signal) {
        signal1 = signal;
    }

    @Override
    public void signal2(Signal2Struct signal) {
        signal2 = signal;
    }
}

Обратите внимание, что это хорошая идея для учета перерывов в работе рабочего процесса.Например, давайте представим, что вышеуказанный рабочий процесс запущен и сигнал получен через 40 минут после запуска, когда все рабочие процессы не работают.В этом случае, когда рабочие вернутся, и timeout в будущем, и signCh будут не пустыми.Поскольку Selector не гарантирует порядок, возможно, что сигнал доставляется до таймера, даже если он был получен после.Так что ваша логика кода должна учитывать это.Например, существует жесткое требование, чтобы сигнал, полученный через 30 минут после начала рабочего процесса, игнорировался.Затем приведенный выше пример должен быть изменен на:

Пример Go:

...
start := workflow.Now(ctx); // must use workflow clock
s.Select(ctx)
duration := workflow.Now(ctx).Sub(start)
if duration.Minutes() >= 30 || (signal1 == nil && signal2 == nil) {
   // handle timeout
} else {
  // process signals
}

Пример Java:

public void main() {
    long start = Workflow.currentTimeMillis(); // must use workflow clock
    Duration timeout = Duration.ofMinutes(30);
    Workflow.await(timeout, () -> signal1 != null || signal2 != null);
    long duration = Workflow.currentTimeMillis() - start;
    if (timeout.toMillis() <= duration || (signal1 == null && signal2 == null)) {
        // handle timeout
    }
    // process signals
}

Обновленный код работает правильно, даже если выполнение рабочего процессабыло отложено на час.

...