Сигнализация является рекомендуемым способом отправки событий в рабочие процессы.
Часто используемый шаблон для рабочих процессов Go заключается в использовании селектора для ожидания нескольких сигнальных каналов, а также для таймера будущего.
Образец Go:
sig1Ch := workflow.GetSignalChannel(ctx, "signal1")
sig2Ch := workflow.GetSignalChannel(ctx, "signal2")
timeout := workflow.NewTimer(ctx, time.Minute * 30)
s := workflow.NewSelector(ctx)
var signal1 *Signal1Struct
var signal2 *Signal2Struct
s.AddFuture(timeout, func(f Future) {
})
s.AddReceive(sig1Ch, func(c Channel, more bool) {
c.Receive(ctx, signal1)
})
s.AddReceive(sig2Ch, func(c Channel, more bool) {
c.Receive(ctx, signal2)
})
s.Select(ctx)
if signal1 == nil && signal2 == nil {
// handle timeout
} else {
// process signals
}
Образец Java:
public interface MyWorkflow {
@WorkflowMethod
void main();
@SignalMethod
void signal1(Signal1Struct signal);
@SignalMethod
void signal2(Signal2Struct signal);
}
public class MyWorkflowImpl implements MyWorkflow {
private Signal1Struct signal1;
private Signal2Struct signal2;
@Override
public void main() {
Workflow.await(Duration.ofMinutes(30),
() -> signal1 != null || signal2 != null);
if (signal1 == null && signal2 == null) {
// handle timeout
}
// process signals
}
@Override
public void signal1(Signal1Struct signal) {
signal1 = signal;
}
@Override
public void signal2(Signal2Struct signal) {
signal2 = signal;
}
}
Обратите внимание, что это хорошая идея для учета перерывов в работе рабочего процесса.Например, давайте представим, что вышеуказанный рабочий процесс запущен и сигнал получен через 40 минут после запуска, когда все рабочие процессы не работают.В этом случае, когда рабочие вернутся, и timeout
в будущем, и signCh
будут не пустыми.Поскольку Selector не гарантирует порядок, возможно, что сигнал доставляется до таймера, даже если он был получен после.Так что ваша логика кода должна учитывать это.Например, существует жесткое требование, чтобы сигнал, полученный через 30 минут после начала рабочего процесса, игнорировался.Затем приведенный выше пример должен быть изменен на:
Пример Go:
...
start := workflow.Now(ctx); // must use workflow clock
s.Select(ctx)
duration := workflow.Now(ctx).Sub(start)
if duration.Minutes() >= 30 || (signal1 == nil && signal2 == nil) {
// handle timeout
} else {
// process signals
}
Пример Java:
public void main() {
long start = Workflow.currentTimeMillis(); // must use workflow clock
Duration timeout = Duration.ofMinutes(30);
Workflow.await(timeout, () -> signal1 != null || signal2 != null);
long duration = Workflow.currentTimeMillis() - start;
if (timeout.toMillis() <= duration || (signal1 == null && signal2 == null)) {
// handle timeout
}
// process signals
}
Обновленный код работает правильно, даже если выполнение рабочего процессабыло отложено на час.