Apache Flink: общее состояние между двумя (или более) менеджерами задач - PullRequest
0 голосов
/ 05 февраля 2020

Предположим, у меня есть два менеджера задач, и у каждого есть только один слот задач. Теперь у меня есть следующая работа:

    KeyedStream<...> streamA = env.addSource(...).keyBy(...);
    KeyedStream<...> streamB = env.addSource(...).keyBy(...);

    streamA.connect(streamB).flatMap(new StatefulJoinFunction()).setParallelism(2);

Один диспетчер задач будет использовать данные из топки Кафки c, а другой - данные из другой топки Кафки c.

Я отправляю задание менеджеру работ для его выполнения. Flink выделяет оба менеджерам задач для обработки flatMap (поскольку у диспетчера задач есть только один слот задач).

flatMap выполняет простое соединение между событиями (используя два ключевых состояния):

    public class StatefulJoinFunction extends RichCoFlatMapFunction<A, B, String> {
        private ValueState<A> AState;
        private ValueState<B> BState;

        @Override
        public void open(Configuration config) {
            AState = getRuntimeContext().getState(new ValueStateDescriptor<>("A event state", A.class));
            BState = getRuntimeContext().getState(new ValueStateDescriptor<>("B event state", B.class));
        }

        @Override
        public void flatMap1(A event, Collector<String> out) throws Exception {
            B secondEvent = BState.value();

            if (secondEvent == null)
                AState.update(event);
            else {
                out.collect(...);
                BState.clear();
            }
        }

        @Override
        public void flatMap2(A event, Collector<String> out) throws Exception {
            A firstEvent = AState.value();

            if (firstEvent == null)
                BState.update(event);
            else {
                out.collect(...);
                AState.clear();
            }
        }
    }

Если я правильно понял, после метода соединения поток становится только одним. Теперь реализованный flatMap должен совместно использовать состояние, поскольку оператор должен контролировать, поступило ли соответствующее событие, чтобы применить соединение, но оно выполняется с паралеллизмом, равным двум, поэтому используются оба диспетчера задач. Это означает, что диспетчер задач должен сохранять внутри состояния другого диспетчера задач (который используется совместно после метода подключения) каждый раз, когда необходимо обновить состояние, или может потребоваться просто прочитать состояние. Как тогда общаются руководители задач? Влияет ли это на производительность, поскольку диспетчеры задач могут работать на разных узлах кластера?

EDIT : я нашел следующую статью в блоге Флинка, и кажется, что два диспетчера задач могут общаться через TCP-соединение, что имеет смысл для меня, поскольку в некоторых случаях нам нужно делить состояния между событиями. Если это не так, объясните мне , как Flink управляет следующим сценарием?

Предположим, что всегда есть два менеджера задач, физически расположенных на двух узлах кластера. Каждый диспетчер задач всегда имеет только один слот. Я запускаю указанное выше задание и устанавливаю параллелизм 2 (используя, например, параметр -p при отправке задания в Диспетчер заданий). Теперь Flink создаст две подзадачи из моей работы, которые структурно одинаковы, и отправит их менеджерам задач. Оба диспетчера задач будут выполнять «одинаковую» работу, но потреблять разные события. Задание использует события из двух тем Kafka: A и B. Это означает, что первый и второй диспетчеры задач будут использовать оба из topi c A и B, но разные события, в противном случае будут дубликаты. Задание такое же, т. Е. Выполняется вышеупомянутая функция RichCoFlatMapFunction, после чего каждый диспетчер задач будет локально работать со своим набором потребляемых событий и личных локальных состояний. Теперь возникает проблема: предположим, что первый диспетчер задач использовал событие, имеющее ключ «1». Это событие поступает внутри RichCoFlatMapFunction и сохраняется внутри состояния, поскольку оператор все еще ожидает другое событие с тем же ключом для создания соединения. Если другое событие, имеющее ключ «1», получено из второго диспетчера задач, и они не делятся своим состоянием или не общаются, соединение будет невозможно. Что не так в моих рассуждениях?

1 Ответ

1 голос
/ 05 февраля 2020

Нет необходимости в том, чтобы два диспетчера задач взаимодействовали с целью совместного использования состояний - в Flink отсутствует совместное использование состояний.

Возможен любой из этих трех графиков выполнения, показанных ниже, в зависимости от подробности о том, как вы расположите источники. Слева от каждого рисунка мы видим исходные операторы для A и B, а справа два параллельных экземпляра оператора с двумя входами, реализующего соединение через RichCoFlatMap.

enter image description here

KeyBy не является оператором, а вместо этого указывает, как соединяются источники и два экземпляра RichCoFlatMap. Он организует, чтобы это было хешированным соединением, которое выполняет перераспределение исходных потоков.

Не имеет большого значения, какой из этих трех сценариев ios используется, потому что во всех трех случаях keyBy будет иметь одинаковый эффект управления всеми событиями для некоторых ключей в Join1 и всеми событиями для другие ключи для Join2.

Другими словами, для любого данного ключа все события для этого ключа будут обрабатываться в одном слоте задачи. Вы можете думать о ValueState<A> как о распределенном (заштрихованном) хранилище ключей / значений, где значения имеют тип А. Каждый диспетчер задач имеет состояние для среза этого хранилища ключей / значений (для непересекающегося подмножества ключей) и обрабатывает все события для этих клавиш (и только для этих клавиш).

Например: в flatMap1, когда BState.value() вызывается с элементом из streamA, среда выполнения Flink получит доступ значение BState для ключа, который в данный момент находится в контексте , что означает значение, связанное с ключом для события из streamA, обрабатываемого в данный момент. Это состояние всегда будет локальным в текущей задаче. Точно так же, flatMap2 всегда будет вызываться с элементами из streamB.

. Такая конструкция исключает любую связь между менеджерами задач, что хорошо для масштабируемости и производительности.

...