Что бы я сделал, учитывая, что это пакетный конвейер, сначала нужно пройти через все возможные Session
, построить список и сохранить его как PCollectionView
. Затем при разборе каждого Event
мы можем проверить, к какому из них Session
относится.
В моем тесте я определил классы и конструкторы следующим образом:
@DefaultCoder(AvroCoder.class)
public static class Session {
String id;
long start;
long stop;
public Session(String id, long start, long stop) {
this.id = id;
this.start = start;
this.stop = stop;
}
public Session() {
// for serialization only
}
}
@DefaultCoder(AvroCoder.class)
public static class Event {
String id;
long timestamp;
public Event(String id, long timestamp) {
this.id = id;
this.timestamp = timestamp;
}
public Event() {
// for serialization only
}
}
мы будемиспользуйте некоторые тестовые данные, такие как:
// Example sessions data
final List<Session> sessionList = Arrays.asList(
new Session("s1", 0L, 100L),
new Session("s2", 100L, 200L),
new Session("s3", 200L, 300L)
);
// Example event data
final List<Event> eventList = Arrays.asList(
new Event("e1", 20L),
new Event("e2", 60L),
new Event("e3", 120L),
new Event("e4", 160L),
new Event("e5", 210L),
new Event("e6", 290L)
);
Сначала мы создадим наш PCollectionView
со всеми возможными сеансами:
// create PCollectionView from sessions
final PCollectionView<List<Session>> sessionPC = p
.apply("Create Sessions", Create.of(sessionList))
.apply("Save as List", View.asList());
и для каждого Event
мы будемотметьте AssignFn
ParDo, в котором Session
в случае падения Event
:
public static class AssignFn extends DoFn<Event, KV<Session, Event>> {
final PCollectionView<List<Session>> sessionPC;
public AssignFn(PCollectionView<List<Session>> TagsideInput) {
this.sessionPC = TagsideInput;
}
@ProcessElement
public void processElement(ProcessContext c) {
Event event = c.element();
// get side input with all possible Sessions
List<Session> sessions = c.sideInput(sessionPC);
// where does the Event fall in?
for (Session session:sessions) {
if (event.timestamp >= session.start && event.timestamp <= session.stop) {
c.output(KV.of(session, event));
break;
}
}
}
}
Основная структура конвейера:
p
.apply("Create Events", Create.of(eventList))
.apply("Assign Sessions", ParDo.of(new AssignFn(sessionPC))
.withSideInputs(sessionPC))
.apply("Group By Key", GroupByKey.<Session,Event>create())
.apply("Log Grouped Results", ParDo.of(new LogFn()));
Обратите внимание, что после *При назначении 1027 * мы применяем операцию GroupByKey
, чтобы получить желаемый вывод в виде KV<Session, Iterable<Event>>
.
LogFn
будет использоваться только для проверки содержимого:
public static class LogFn extends DoFn<KV<Session, Iterable<Event>>, KV<Session, Iterable<Event>>> {
@ProcessElement
public void processElement(ProcessContext c) {
Session session = c.element().getKey();
Iterable<Event> events = c.element().getValue();
StringBuilder str = new StringBuilder();
// print session info
str.append(String.format("\nSession id=%s, start=%d, stop=%d", session.id, session.start, session.stop));
// print each event info
for (Event event:events) {
str.append(String.format("\n---Event id=%s, timestamp=%d", event.id, event.timestamp));
}
LOG.info(str.toString());
c.output(c.element());
}
}
Я получаю ожидаемый результат:
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s2, start=100, stop=200
---Event id=e3, timestamp=120
---Event id=e4, timestamp=160
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s1, start=0, stop=100
---Event id=e1, timestamp=20
---Event id=e2, timestamp=60
Nov 07, 2019 12:36:22 AM com.dataflow.samples.AssignSessions$LogFn processElement
INFO:
Session id=s3, start=200, stop=300
---Event id=e6, timestamp=290
---Event id=e5, timestamp=210
Полный код здесь .
Протестировано с Beam SDK 2.16.0 и DirectRunner
.