Как мне объединить два потока в Apache Flink? - PullRequest
0 голосов
/ 20 января 2019

Я начинаю с flink и смотрю на одно из официальных руководств .

Насколько я понимаю, цель этого упражнения - объединить два потока в атрибуте времени.

Задача:

Результатом этого упражнения является поток данных записей Tuple2, по одной для каждого отдельного идентификатора поездки. Вы должны игнорировать КОНЕЦ событий, и присоединяйтесь только к событию для СТАРТА каждой поездки с соответствующие ему тарифы.

Полученный поток должен быть распечатан на стандартный вывод.

Вопрос: Как функция Enrichment способна объединить два потока, иначе. откуда он знает, к какой ярмарке присоединиться, с какой ездить? Я ожидал, что он буферизует несколько ярмарок / поездок, пока для входящей ярмарки / поездки не будет найден соответствующий партнер.

В моем понимании это просто сохраняет каждую поездку / ярмарку, которую он видит, и объединяет ее со следующей лучшей поездкой / ярмаркой. Почему это правильное соединение?

Предоставленное решение:

/*
 * Copyright 2017 data Artisans GmbH
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.dataartisans.flinktraining.solutions.datastream_java.state;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

/**
 * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
 * (http://training.data-artisans.com).
 *
 * The goal for this exercise is to enrich TaxiRides with fare information.
 *
 * Parameters:
 * -rides path-to-input-file
 * -fares path-to-input-file
 *
 */
public class RidesAndFaresSolution extends ExerciseBase {
    public static void main(String[] args) throws Exception {

        ParameterTool params = ParameterTool.fromArgs(args);
        final String ridesFile = params.get("rides", pathToRideData);
        final String faresFile = params.get("fares", pathToFareData);

        final int delay = 60;                   // at most 60 seconds of delay
        final int servingSpeedFactor = 1800;    // 30 minutes worth of events are served every second

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(ExerciseBase.parallelism);

        DataStream<TaxiRide> rides = env
                .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
                .filter((TaxiRide ride) -> ride.isStart)
                .keyBy("rideId");

        DataStream<TaxiFare> fares = env
                .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
                .keyBy("rideId");

        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
                .connect(fares)
                .flatMap(new EnrichmentFunction());

        printOrTest(enrichedRides);

        env.execute("Join Rides with Fares (java RichCoFlatMap)");
    }

    public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
        // keyed, managed state
        private ValueState<TaxiRide> rideState;
        private ValueState<TaxiFare> fareState;

        @Override
        public void open(Configuration config) {
            rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
            fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
        }

        @Override
        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiFare fare = fareState.value();
            if (fare != null) {
                fareState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                rideState.update(ride);
            }
        }

        @Override
        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiRide ride = rideState.value();
            if (ride != null) {
                rideState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                fareState.update(fare);
            }
        }
    }
}

1 Ответ

0 голосов
/ 21 января 2019

В контексте данного конкретного учебного упражнения для каждого значения rideId есть три события: начальное событие TaxiRide, конечное событие TaxiRide и TaxiFare. Цель этого упражнения - связать каждое стартовое событие TaxiRide с одним событием TaxiFare, имеющим один и тот же поездку, или другими словами, присоединиться к потоку поездки и потоку тарифа на поездке, зная, что будет только одно из них.

Это упражнение демонстрирует, как работает ключевое состояние в Flink. Keyed state - это хранилище значений ключей. Когда у нас есть элемент ValueState, такой как ValueState<TaxiRide> rideState, Flink будет хранить отдельную запись в своем бэкэнде состояния для каждого отдельного значения ключа (rideId).

Каждый раз, когда flatMap1 и flatMap2 вызываются, есть неявно ключ (a rideId) в контексте, и когда мы вызываем rideState.update(ride) или rideState.value(), мы обращаемся не к одной переменной, а к установке и получение записи в хранилище значений ключей, используя rideId в качестве ключа.

В этом упражнении оба потока обозначаются rideId, поэтому потенциально существует один элемент rideState и один элемент fareState для каждого отдельного rideId. Следовательно, решение, которое было предоставлено, заключается в буферизации множества поездок и тарифов, но только по одной для каждого rideId (этого достаточно, учитывая, что поездки и тарифы идеально согласованы в этом наборе данных).

Итак, вы спросили:

Как функция EnrichmentFunction способна объединить два потока, иначе. как он узнает, к какому тарифу присоединиться, к какой поездке?

И ответ

Он присоединяется к тарифу с тем же rideId.

В этом конкретном упражнении, о котором вы спрашивали, показано, как реализовать простое объединение с обогащением для представления идей состояния ключа и связанных потоков. Но более сложные объединения, безусловно, возможны с Flink. См. документы по объединению , объединения с таблицей Flink API , объединения с Flink SQL и упражнение по объединениям на основе времени для получения дополнительной информации.

...