Какие правила используются при создании новой отметки времени событий - PullRequest
0 голосов
/ 04 августа 2020

Я объединяю два потока, чтобы создать новый поток после операции. код, как показано ниже:

DataStream<NewTableA> join1 =
    oldTableADataStream
        .keyBy(t -> t.getFa3())
        .join(tableBDataStream)
        .where(new oldTableAKeySelector())
        .equalTo(new TableBKeySelector())
        .window(EventTimeSessionWindows.withGap(Time.milliseconds(WIN_GAP_TIME)))
        .allowedLateness(Time.milliseconds(allowedLateness))
        .apply(new oldTableAJoinTableBFunc());
        //.assignTimestampsAndWatermarks(new assignTSAndWMLastMax<>(maxOutOfOrderness));


   join1.process(
    new ProcessFunction<NewTableA, NewTableA>() {
      @Override
      public void processElement(NewTableA value, Context ctx, Collector<NewTableA> out)
          throws Exception {
        System.out.println(" NewTableA wmts:" + ctx.timerService().currentWatermark());
        System.out.println(" NewTableA ts:" + ctx.timestamp() + " " + value);
      }
    });

код oldTableAJoinTableBFunc , как показано ниже

public class oldTableAJoinTableBFunc implements JoinFunction<OldTableA, TableB, NewTableA> {



  @Override
  public NewTableA join(OldTableA oldTableA, TableB tableB) throws Exception {

    //System.out.println("join1 on");

    NewTableA newTableA = new NewTableA();

    newTableA.setPA1(oldTableA.getPa1());
    newTableA.setA2(oldTableA.getA2());
    newTableA.setFA3(oldTableA.getFa3());
    newTableA.setFA4(oldTableA.getFa4());
    newTableA.setB2(tableB.getB2());
    newTableA.setB3(tableB.getB3());
    // importance!!!
    newTableA.setTs(oldTableA.getTs());

    return newTableA;
  }
}

пример выше, oldTableADataStream присоединение tableBDataStream к join1 потоку во время события.

Я обнаружил интересное явление. метка времени события в join1 автоматически создается с помощью flink.

когда я создаю тестовые данные для oldTableADataStream и tableBDataStream, я намеренно устанавливаю все 1000000010 и 1000000044. но после соединения и применения функции , отметка времени события в новом потоке join1 изменена с помощью flink, распечатайте следующее:

NewTableA wmts:**1000000042**
 NewTableA ts:**1000000143** NewTableA{PA1=10, A2='a20', FA3=21, B2='b21', B3='b31', FA4=39, C2='null', FC3=null, D2='null', D3='null', ts=**1000000011**}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:**1000000143** NewTableA{PA1=1, A2='a20', FA3=20, B2='b20', B3='b30', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=**1000000010**}
 NewTableA wmts:1000000042
 NewTableA ts:1000000143 NewTableA{PA1=11, A2='a20', FA3=21, B2='b21', B3='b31', FA4=40, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000135 NewTableA{PA1=21, A2='a20', FA3=22, B2='b22', B3='b32', FA4=30, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000138 NewTableA{PA1=38, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:1000000138 NewTableA{PA1=57, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
 NewTableA ts:1000000143 NewTableA{PA1=2, A2='a20', FA3=20, B2='b20', B3='b30', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA ts:1000000143 NewTableA{PA1=12, A2='a20', FA3=21, B2='b21', B3='b31', FA4=41, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:1000000135 NewTableA{PA1=22, A2='a20', FA3=22, B2='b22', B3='b32', FA4=31, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA ts:1000000138 NewTableA{PA1=58, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:1000000143 NewTableA{PA1=13, A2='a20', FA3=21, B2='b21', B3='b31', FA4=42, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000138 NewTableA{PA1=39, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000015}
 NewTableA wmts:1000000042
 NewTableA ts:1000000138 NewTableA{PA1=59, A2='a20', FA3=24, B2='b24', B3='b34', FA4=36, C2='null', FC3=null, D2='null', D3='null', ts=1000000018}
 NewTableA ts:1000000135 NewTableA{PA1=23, A2='a20', FA3=22, B2='b22', B3='b32', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000143 NewTableA{PA1=3, A2='a20', FA3=20, B2='b20', B3='b30', FA4=32, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 NewTableA ts:1000000143 NewTableA{PA1=14, A2='a20', FA3=21, B2='b21', B3='b31', FA4=43, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA ts:1000000138 NewTableA{PA1=40, A2='a20', FA3=23, B2='b23', B3='b33', FA4=34, C2='null', FC3=null, D2='null', D3='null', ts=1000000016}
 NewTableA ts:1000000135 NewTableA{PA1=24, A2='a20', FA3=22, B2='b22', B3='b32', FA4=33, C2='null', FC3=null, D2='null', D3='null', ts=1000000011}
 NewTableA wmts:1000000042
 NewTableA wmts:1000000042
 ......

Кажется, что нет правил, создающих отметку времени нового события, Как было 1000000143 и 1000000138 и 1000000135 и так далее? Кажется, не имеет ничего общего с водяным знаком, потому что водяной знак отметки времени 1000000042 , он отличается от отметки времени события в то же время.

На каких правилах полагается операция для генерации новой метки времени событий я не нашел официальных инструкций, кто может дать на них ссылку?

1 Ответ

1 голос
/ 04 августа 2020

Выходные события, созданные временем windows, имеют метки времени, установленные на максимальную метку времени, которая попадает в окно. Таким образом, это в основном граница сеанса, содержащего событие.

Поскольку у вас есть сеанс с ключом windows на основе FA3, вы получаете значения, которые зависят от FA3: 1000000143 для 20 и 21, 1000000135 для 22 и 1000000138 для 23 и 24.

...