Гарантия заказа с использованием потоков и сокращения цепочки потребителей - PullRequest
6 голосов
/ 23 января 2020

Таким образом, как и в текущем сценарии, у нас есть набор API, перечисленных ниже:

Consumer<T> start();
Consumer<T> performDailyAggregates();
Consumer<T> performLastNDaysAggregates();
Consumer<T> repopulateScores();
Consumer<T> updateDataStore();

Над ними один из наших планировщиков выполняет задачи, например

private void performAllTasks(T data) {
    start().andThen(performDailyAggregates())
            .andThen(performLastNDaysAggregates())
            .andThen(repopulateScores())
            .andThen(updateDataStore())
            .accept(data);
}

Рассматривая это, я подумал о переходе к более гибкой реализации 1 выполнения задач, которые будут выглядеть следующим образом:

// NOOP in the context further stands for  'anything -> {}'
private void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    consumerList.reduce(NOOP, Consumer::andThen).accept(data);
}

Точка, которая Теперь меня поразило то, что Javado c четко заявляет, что

accumulator - ассоциативная, не мешающая работе, не сохраняющая состояния функция для объединения двух значений

Next вверх я думал Как обеспечить порядок обработки в потоках java8? чтобы быть упорядоченным (порядок обработки должен быть таким же, как порядок встречи)!

Хорошо, поток сгенерированный из List будет упорядочен , и если поток не будет сделан parallel до reduce, следующая реализация должна работать. 2

private void performAllTasks(List<Consumer<T>> consumerList, T data) {
    consumerList.stream().reduce(NOOP, Consumer::andThen).accept(data);
}

Q. Верно ли это предположение 2 ? Будет ли гарантировано всегда выполнять потребителей в том порядке, в котором они были в исходном коде?

Q. Есть ли возможность как-то выставлять 1 также и вызываемым абонентам для выполнения задач?

Ответы [ 2 ]

8 голосов
/ 23 января 2020

Как указал Андреас , Consumer::andThen - это ассоциативная функция, и хотя конечный потребитель может иметь другую внутреннюю структуру, он все равно эквивалентен.

Но давайте отладим ее

public static void main(String[] args) {
    performAllTasks(IntStream.range(0, 10)
        .mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
    private final Consumer<Object> first, second;
    private final boolean leaf;
    DebuggableConsumer(String name) {
        this(x -> System.out.println(name), x -> {}, true);
    }
    DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
        first = a; second = b;
        leaf = l;
    }
    public void accept(Object t) {
        first.accept(t);
        second.accept(t);
    }
    @Override public Consumer<Object> andThen(Consumer<? super Object> after) {
        return new DebuggableConsumer(this, after, false);
    }
    public @Override String toString() {
        if(leaf) return first.toString();
        return toString(new StringBuilder(200), 0, 0).toString();
    }
    private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
        int myHandle = sb.length()-2;
        sb.append(leaf? first: "combined").append('\n');
        if(!leaf) {
            int nPreS=sb.length();
            ((DebuggableConsumer)first).toString(
                sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
            nPreS=sb.length();
            sb.append(sb, preS, preEnd);
            int lastItemHandle=sb.length();
            ((DebuggableConsumer)second).toString(sb.append("  "), nPreS, sb.length());
            sb.setCharAt(lastItemHandle, '\u2514');
        }
        if(myHandle>0) {
            sb.setCharAt(myHandle, '\u251c');
            sb.setCharAt(myHandle+1, '\u2500');
        }
        return sb;
    }
}

напечатает

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─combined
│ │ │ ├─combined
│ │ │ │ ├─combined
│ │ │ │ │ ├─combined
│ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ │ ├─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
│ │ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
│ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
│ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
│ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
│ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
│ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
│ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
│ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
└─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a

, а изменение кода сокращения на

private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
    Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
    reduced.accept(data);
    System.out.println(reduced);
}

будет печатать на моем аппарате

0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@49097b5d
│ │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6e2c634b
│ └─combined
│   ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@37a71e93
│   └─combined
│     ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7e6cbb7a
│     └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7c3df479
└─combined
  ├─combined
  │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7106e68e
  │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7eda2dbb
  └─combined
    ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6576fe71
    └─combined
      ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@76fb509a
      └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@300ffa5d

, иллюстрируя точку Андреас ответ, но также подчеркивая совершенно другую проблему. Вы можете максимизировать его, используя, например, IntStream.range(0, 100) в коде примера.

Результат параллельной оценки фактически лучше, чем последовательная оценка, поскольку последовательная оценка создает несбалансированное дерево. Принимая произвольный поток потребителей, это может быть фактической проблемой производительности или даже привести к StackOverflowError при попытке оценить конечного потребителя.

Для любого нетривиального числа потребителей вам действительно нужен сбалансированный потребитель дерево, но использование параллельного потока для этого не является правильным решением, так как а) Consumer::andThen является дешевой операцией без реальной выгоды от параллельной оценки и б) балансировка будет зависеть от несвязанных свойств, таких как природа источника потока и количество ядер ЦП, которые определяют, когда сокращение возвращается к последовательному алгоритму.

Конечно, самое простое решение будет

private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
    consumers.forEachOrdered(c -> c.accept(data));
}

Но когда вы хотите построить составное Consumer для повторного использования, вы можете использовать

private static final int ITERATION_THRESHOLD = 16; // tune yourself

public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
    List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
    if(consumerList.isEmpty()) return t -> {};
    if(consumerList.size() == 1) return consumerList.get(0);
    if(consumerList.size() < ITERATION_THRESHOLD)
        return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
    return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
    if(end-start>2) {
        int mid=(start+end)>>>1;
        return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
    }
    T t = l.get(start++);
    if(start<end) t = f.apply(t, l.get(start));
    assert start==end || start+1==end;
    return t;
}

Код предоставит один Consumer, просто используя al oop, когда число потребителей превышает пороговое значение. Это простейшее и наиболее эффективное решение для большего числа потребителей, и на самом деле вы можете отбросить все другие подходы для меньших чисел и при этом получить приемлемую производительность…

Обратите внимание, что это по-прежнему не мешает параллельному обработка потока потребителей, если от этого действительно выиграет их строительство.

5 голосов
/ 23 января 2020

Даже если Stream<Consumer<T>> сделан параллельным, результирующее соединение Consumer будет выполнять отдельных потребителей по порядку, предполагая:

Допустим, у вас есть список из 4 потребителей [A, B, C, D]. Обычно без параллели происходит следующее:

x = A.andThen(B);
x = x.andThen(C);
compound = x.andThen(D);

, поэтому при вызове compound.apply() будут вызываться A, B, C, затем D в этом порядке.

Если вы включите параллельный интерфейс, потоковая структура может вместо этого разделить его для обработки двумя потоками, [A, B] для потока 1 и [C, D] для потока 2.

Это означает, что произойдет следующее:

x = A.andThen(B);
y = C.andThen(D);
compound = x.andThen(y);

В результате сначала применяется x, что означает A, затем B, затем y, что означает C, затем D.

Таким образом, хотя составной потребитель построен как [[A, B], [C, D]] вместо левой ассоциативной [[[A, B], C], D], 4 потребителя выполняются по порядку, все потому, что Consumer::andThen является ассоциативным .

...