Предположим, у меня есть два менеджера задач, и у каждого есть только один слот задач. Теперь у меня есть следующая работа:
KeyedStream<...> streamA = env.addSource(...).keyBy(...);
KeyedStream<...> streamB = env.addSource(...).keyBy(...);
streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);
Один диспетчер задач будет использовать данные из топки Кафки c, а другой - данные из другой топки Кафки c.
Я отправляю задание менеджеру работ для его выполнения. Flink выделяет оба менеджерам задач для обработки flatMap (поскольку у диспетчера задач есть только один слот задач).
flatMap выполняет простое соединение между событиями (используя два ключевых состояния):
public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
private ValueState<A> AState;
private ValueState<B> BState;
@Override
public void open(Configuration config) {
AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
}
@Override
public void flatMap1(A event, Collector<String> out) throws Exception {
B secondEvent = BState.value();
if (secondEvent == null)
AState.update(event);
else {
out.collect(...);
BState.clear();
}
}
@Override
public void flatMap2(A event, Collector<String> out) throws Exception {
A firstEvent = AState.value();
if (firstEvent == null)
BState.update(event);
else {
out.collect(...);
AState.clear();
}
}
}
Если я правильно понял, после метода соединения поток становится только одним. Теперь реализованный flatMap должен совместно использовать состояние, поскольку оператор должен контролировать, поступило ли соответствующее событие, чтобы применить соединение, но оно выполняется с паралеллизмом, равным двум, поэтому используются оба диспетчера задач. Это означает, что диспетчер задач должен сохранять внутри состояния другого диспетчера задач (который используется совместно после метода подключения) каждый раз, когда необходимо обновить состояние, или может потребоваться просто прочитать состояние. Как тогда общаются руководители задач? Влияет ли это на производительность, поскольку диспетчеры задач могут работать на разных узлах кластера?
EDIT : я нашел следующую статью в блоге Флинка, и кажется, что два диспетчера задач могут общаться через TCP-соединение, что имеет смысл для меня, поскольку в некоторых случаях нам нужно делить состояния между событиями. Если это не так, объясните мне , как Flink управляет следующим сценарием?
Предположим, что всегда есть два менеджера задач, физически расположенных на двух узлах кластера. Каждый диспетчер задач всегда имеет только один слот. Я запускаю указанное выше задание и устанавливаю параллелизм 2 (используя, например, параметр -p при отправке задания в Диспетчер заданий). Теперь Flink создаст две подзадачи из моей работы, которые структурно одинаковы, и отправит их менеджерам задач. Оба диспетчера задач будут выполнять «одинаковую» работу, но потреблять разные события. Задание использует события из двух тем Kafka: A и B. Это означает, что первый и второй диспетчеры задач будут использовать оба из topi c A и B, но разные события, в противном случае будут дубликаты. Задание такое же, т. Е. Выполняется вышеупомянутая функция RichCoFlatMapFunction, после чего каждый диспетчер задач будет локально работать со своим набором потребляемых событий и личных локальных состояний. Теперь возникает проблема: предположим, что первый диспетчер задач использовал событие, имеющее ключ «1». Это событие поступает внутри RichCoFlatMapFunction и сохраняется внутри состояния, поскольку оператор все еще ожидает другое событие с тем же ключом для создания соединения. Если другое событие, имеющее ключ «1», получено из второго диспетчера задач, и они не делятся своим состоянием или не общаются, соединение будет невозможно. Что не так в моих рассуждениях?