Как объединить две PCollections, когда первая содержит таймфрейм, а вторая - временную метку какого-либо события? - PullRequest
0 голосов
/ 06 ноября 2019

У меня есть пакетный конвейер, написанный в Dataflow. Я хотел бы добиться следующего объединения данных.

У меня есть две PCollections. Первый представляет сеанс:

class Session{
    String id
    long start; 
    long stop;
}

Второй представляет некоторое событие:

class Event{
    long timestamp;
    String id;
}

Я хотел бы объединить эти две PCollections и в конце получить что-то вроде KV<Session,Iterable<Event>> - так чтоструктура, которая содержит сеансы со связанным списком событий. Если временная метка события находится в пределах временного интервала сеанса (или сеансов), она должна агрегироваться вместе с ним (или с ними). ​​

Каков наилучший способ достижения этого?

1 Ответ

0 голосов
/ 07 ноября 2019

Что бы я сделал, учитывая, что это пакетный конвейер, сначала нужно пройти через все возможные Session, построить список и сохранить его как PCollectionView. Затем при разборе каждого Event мы можем проверить, к какому из них Session относится.

В моем тесте я определил классы и конструкторы следующим образом:

@DefaultCoder(AvroCoder.class)
public static class Session {
    String id;
    long start; 
    long stop;

    public Session(String id, long start, long stop) {
        this.id = id;
        this.start = start;
        this.stop = stop;
    }

    public Session() {
        // for serialization only
    }
}

@DefaultCoder(AvroCoder.class)
public static class Event {
    String id;
    long timestamp;

    public Event(String id, long timestamp) {
        this.id = id;
        this.timestamp = timestamp;
    }

    public Event() {
        // for serialization only
    }
}

мы будемиспользуйте некоторые тестовые данные, такие как:

// Example sessions data
final List<Session> sessionList = Arrays.asList(
    new Session("s1", 0L, 100L),
    new Session("s2", 100L, 200L),
    new Session("s3", 200L, 300L)
);

// Example event data
final List<Event> eventList = Arrays.asList(
    new Event("e1", 20L),
    new Event("e2", 60L),
    new Event("e3", 120L),
    new Event("e4", 160L),
    new Event("e5", 210L),
    new Event("e6", 290L)            
);

Сначала мы создадим наш PCollectionView со всеми возможными сеансами:

// create PCollectionView from sessions
final PCollectionView<List<Session>> sessionPC = p
    .apply("Create Sessions", Create.of(sessionList))
    .apply("Save as List", View.asList());

и для каждого Event мы будемотметьте AssignFn ParDo, в котором Session в случае падения Event:

public static class AssignFn extends DoFn<Event, KV<Session, Event>> {  

    final PCollectionView<List<Session>> sessionPC;

    public AssignFn(PCollectionView<List<Session>> TagsideInput) {
        this.sessionPC = TagsideInput;
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        Event event = c.element();

        // get side input with all possible Sessions
        List<Session> sessions = c.sideInput(sessionPC);

        // where does the Event fall in?
        for (Session session:sessions) { 
            if (event.timestamp >= session.start && event.timestamp <= session.stop) {
                c.output(KV.of(session, event));
                break;
            }
        }
    }
}

Основная структура конвейера:

p
    .apply("Create Events", Create.of(eventList))
    .apply("Assign Sessions", ParDo.of(new AssignFn(sessionPC))
        .withSideInputs(sessionPC))
    .apply("Group By Key", GroupByKey.<Session,Event>create())
    .apply("Log Grouped Results", ParDo.of(new LogFn()));

Обратите внимание, что после *При назначении 1027 * мы применяем операцию GroupByKey, чтобы получить желаемый вывод в виде KV<Session, Iterable<Event>>.

LogFn будет использоваться только для проверки содержимого:

public static class LogFn extends DoFn<KV<Session, Iterable<Event>>, KV<Session, Iterable<Event>>> {  

    @ProcessElement
    public void processElement(ProcessContext c) {
        Session session = c.element().getKey();
        Iterable<Event> events = c.element().getValue();
        StringBuilder str = new StringBuilder(); 

        // print session info
        str.append(String.format("\nSession id=%s, start=%d, stop=%d", session.id, session.start, session.stop));

        // print each event info
        for (Event event:events) { 
            str.append(String.format("\n---Event id=%s, timestamp=%d", event.id, event.timestamp));
        }

        LOG.info(str.toString());

        c.output(c.element());
    }
}

Я получаю ожидаемый результат:

Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO: 
Session id=s2, start=100, stop=200
---Event id=e3, timestamp=120
---Event id=e4, timestamp=160
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO: 
Session id=s1, start=0, stop=100
---Event id=e1, timestamp=20
---Event id=e2, timestamp=60
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO: 
Session id=s3, start=200, stop=300
---Event id=e6, timestamp=290
---Event id=e5, timestamp=210

Полный код здесь .

Протестировано с Beam SDK 2.16.0 и DirectRunner.

...