Интеграционный тест Flink с сервисом времени - PullRequest
0 голосов
/ 26 июня 2019

Я пытаюсь реализовать ИТ для конвейера, который использует службу времени.

Конвейер должен рассчитать метку на основе событий, произведенных пользователем за последние 90 дней.Я должен учитывать исторические события, производимые не только новыми.

Код написан на котлине.Я сожалею об этом.

Упрощенная функция, которую я реализовал:

class Process90DaysLabelFunction : KeyedProcessFunction<String, Event, EventReport>(), ResultTypeQueryable<EventReport> {


    private lateinit var eventsState: ListState<Event>

    override fun getProducedType(): TypeInformation<Event> = TypeExtractor.getForClass(Event::class.java)

    override fun open(parameters: Configuration?) {

        this.eventsState = runtimeContext.getListState(ListStateDescriptor<Event>("events-state", Event::class.java))
    }

    override fun processElement(value: Event, ctx: Context, out: Collector<Event>) {
        val event = value.getValue()


        var events = eventsState.get().toList()

        //is the first event for the user?
        if (events.isEmpty()) {
            ctx.timerService().registerEventTimeTimer(Instant.parse(event.getTimestamp()).toEpochMilli() + EXPIRATION_THRESHOLD)
        }

        eventsState.add(event)

        events = events + event

        val userId = ctx.currentKey

        out.collect(createEventReport(userId, events))
    }

    override fun onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector<Event>) {

        val events = eventsState.get().toList()

        val oldEvents = events.filter {
            val orderItemTimestamp = it.getTimestamp()
            orderItemTimestamp <= timestamp - EXPIRATION_THRESHOLD
        }

        val remainEvents = events - oldEvents

        val oldestUserOrder = remainEvents.minBy { it.getTimestamp() }

        oldestUserOrder?.let {
            ctx.timerService().registerEventTimeTimer(it.getTimestamp() + EXPIRATION_THRESHOLD)
        }

        val userId = ctx.currentKey

        out.collect(createEventReport(userId, remainEvents))

        if  (remainEvents.isNotEmpty()) {
            eventsState.update(remainEvents)
        } else {

            eventsState.clear()
        }


    }

    private fun createOrderReport(userId: String, events: List<Event>): EventReport {
        val groupedUserByType = events.groupBy { it.getType() }

        val label1 = groupedUserByType[1]?.size ?: 0
        val label2 = groupedUserByType[2]?.size ?: 0

        val segment = if(label1 > label2) "LOW" else "HIGH"

        return EventReport(userId, segment, label1, label2)
    }

    companion object {
        const val EXPIRATION_THRESHOLD = 7776000000L
    }

}

На конвейере я установил фиксированное извлечение водяного знака.

Контрольный пример Iпытаюсь проверить:

  @Test
    fun testCase() {
        val event1 = Event(id = "eventId1", type= 1,timestamp = "2019-01-01T18:46:32Z")
        val event2 = Event(id = "eventId2", type=1, timestamp = "2019-02-19T18:46:32Z")
        val event3 = Event(id = "eventId3", type= 1,timestamp = "2019-06-02T18:46:32Z")

        val pipeline = EventPipeline(
                eventSource = this.streamExecutionEnvironment.addSource(event1,event2,event3),
                collectSink = MockCollectSink()
        )
        pipeline.exec(this.streamExecutionEnvironment, this.tableEnvironment)

        this.streamExecutionEnvironment.execute()

val expectedReports = listOf(
         EventReport("userId", "LOW", 1, 0),
         EventReport("userId", "LOW", 2, 0),
         EventReport("userId", "LOW", 1, 0), // emitted by onTime
         EventReport("userId", "LOW", 1, 0),
         EventReport("userId", "LOW", 0, 0) //emitted by onTime
) 
        Assert.assertEquals("Wrong event report message .", expectedReports, MockCollectSink.values)
    }

Ожидаемый результат должен быть как expectedReports, но на самом деле:

         EventReport("userId", "LOW", 1, 0),
         EventReport("userId", "LOW", 2, 0),
         EventReport("userId", "LOW", 3, 0),
         EventReport("userId", "LOW", 2, 0), // onTime fired
         EventReport("userId", "LOW", 1, 0),
         EventReport("userId", "LOW", 0, 0) // onTime fired

Кажется, что первый onTimer выполняется после третьих событий вместораньше.

Такое поведение не происходит, если я запускаю круговую диаграмму в кластере.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...