Как мне создать Flink RichParallelSourceFunction с противодавлением? - PullRequest
0 голосов
/ 27 апреля 2020

В Flink у меня есть производитель данных, который публикует сообщения в RabbitMQ. Конвейер выглядит следующим образом:

env.addSource(SomeMessageGenerator(0L, 1L), "MessageGenerator")
   .flatMap<ByteArray>(bufferToByteArray<SomeMessage>(jobCtx.rabbitConfig.bufferSize))
   .returns(ByteArray::class.java)
   .rabbitSink(jobCtx.rabbitConfig)

Я генерирую сообщения (имитирую мое приложение), а затем помещаю в буфер для преобразования в массив (это делается с помощью функции richTmap flapmap bufferToByteArray, показанной ниже). Я делаю это, чтобы отправлять их в RabbitMQ в виде группового списка вместо одного сообщения за раз.

Проблема, с которой я столкнулся, заключается в том, что если я установлю Thread.sleep (0L) в генераторе данных, я получу исключение из памяти. Когда я генерирую сообщения, flink не создает обратного давления, поэтому источник генерирует сообщения быстрее, чем он может обработать. Есть ли способ создать источник данных, который генерирует сообщения так быстро, как только может быть обработан?

Следующее переопределение запуска для абстрактного класса RichParallelSourceFunction, называемого SomeMessageGenerator. Просто генерирует сообщения в al oop.

    override fun run(ctx: SourceFunction.SourceContext<SomeMessage>) {
        while(stopCondition()) {
            Thread.sleep(generationIntervalMillis)
            ctx.collect(Datagen.random(true, 30, 30))
        }
    }

Это буфер для преобразования из messagse в список сообщений, поэтому пакетирование выполняется, потому что отправка одного сообщения за раз в кролике неэффективна.

fun <T> bufferToByteArray(
        bufferSize: Int = 100,
        maxInterval: Long = 30_000L
) : RichFlatMapFunction<T, ByteArray> {
    return object: RichFlatMapFunction<T, ByteArray>() {
        private lateinit var bufferedElements: List<MutableList<T>>
        private lateinit var lastTimeSentList: MutableList<Long>
        private val mapper = ObjectMapper()

        override fun flatMap(value: T, out: Collector<ByteArray>) {
            val currentTime = System.currentTimeMillis()
            val buffer = bufferedElements[runtimeContext.indexOfThisSubtask]
            val timeInterval = currentTime - lastTimeSentList[runtimeContext.indexOfThisSubtask]
            buffer.add(value)
            if(buffer.size == bufferSize || timeInterval > maxInterval ) {
                val bytesSerialize = mapper.writeValueAsBytes(buffer)
                out.collect(bytesSerialize)
                buffer.clear()
                lastTimeSentList[runtimeContext.indexOfThisSubtask] = currentTime
            }
        }

        override fun open(parameters: Configuration) {
            bufferedElements = (1..runtimeContext.maxNumberOfParallelSubtasks)
                    .map { mutableListOf<T>() }
            lastTimeSentList = (1..runtimeContext.maxNumberOfParallelSubtasks)
                    .map { System.currentTimeMillis() }.toMutableList()
        }

        override fun close() { }
    }
}
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:3664)
        at java.lang.String.<init>(String.java:207)
        at sun.misc.FloatingDecimal$BinaryToASCIIBuffer.toJavaFormatString(FloatingDecimal.java:302)
        at sun.misc.FloatingDecimal.toJavaFormatString(FloatingDecimal.java:70)
        at java.lang.Double.toString(Double.java:204)
        at java.lang.String.valueOf(String.java:3141)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8JsonGenerator.writeNumber(UTF8JsonGenerator.java:884)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.NumberSerializers$DoubleSerializer.serialize(NumberSerializers.java:225)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.MapSerializer.serializeFieldsUsing(MapSerializer.java:719)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:517)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.MapSerializer.serialize(MapSerializer.java:31)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
        at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3022)
        at eco.analytics.bridge.flink.BufferOperatorsKt$bufferToByteArray$1.flatMap(BufferOperators.kt:27)
        at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
        at eco.analytics.bridge.flink.FlinkMonitorMessageGenerator.run(SourceGenerators.kt:62)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)

Редактировать: Было бы хорошо, чтобы отбрасывать сообщения, что не хорошо, исключение из памяти. Но я не вижу способа обнаружить, что у меня заканчивается буферное пространство.

1 Ответ

0 голосов
/ 05 мая 2020

Насколько мне удалось выяснить, у Флинка нет обратного давления на источник. Вы должны Thread.sleep или что-то подобное.

В официальном блоге Flink есть пример приложения, которое вы можете видеть, как выполняется Thread.sleep:

https://github.com/afedulov/fraud-detection-demo/blob/master/flink-job/src/main/java/com/ververica/field/sources/BaseGenerator.java

...