Самый быстрый способ погрузить JSON в Кафку с помощью FLINK - PullRequest
0 голосов
/ 30 января 2019

Оптимизация кода

У меня есть приложение flink, которое считывает данные с URL / порта, выполняет их обработку и возвращает JSON.Затем я преобразовываю JSON в строку и опускаю его в Kafka.

Текущая производительность и заметная проблема

Если я просто выполню обработку -> я смогу выполнить около 30 000 строк через функцию, однако, когда я добавлю функцию, чтобы преобразовать ее в STring, а затем опуститься доkafka Моя пропускная способность падает до 17 000 строк в секунду.

Нужно ли конвертировать JSON в String, прежде чем я опущусь в Kafka?Если нет, то как мне вставить json ObjectNode в kafka?

Иначе, какие есть другие решения.Я думаю, что узким местом является функция в строку

Я пытался преобразовать JSON в строку, используя несколько методов (функция .toString, StringBuilder в строку).

 // Read from Source
 val in_stream = env.socketTextStream(url, port,      socket_stream_deliminator, socket_connection_retries).setParallelism(1)

 // Perform Process
 .map(x=>{Process(x)}).setParallelism(1)

 // Convert to STring
 .map(x => ObjectNodeToString({
     val json_string_builder = StringBuilder.newBuilder
     json_string_builder.append(x)
     return json_string_builder.toString()
 })).setParallelism(1)

 // sink data
 .addSink(new FlinkKafkaProducer[String](broker_hosts, global_topic, new SimpleStringSchema()))

Я хотел бы сохранитьобработка 30 000 строк в секунду.который я получаю без функции преобразования в строку.Могу ли я погрузить ObjectNode непосредственно в kafka?

1 Ответ

0 голосов
/ 30 января 2019

Можно.Sink выполняет сериализацию данных объектов в байтовый массив перед отправкой его в kafka.Убедитесь, что ваша функция приемника снабжена сериализатором, который способен конвертировать ObjectNode в байтовый массив.

Также убедитесь, что потребитель готов получать объекты ObjectNode, а не строки.

...