Оптимизация кода
У меня есть приложение flink, которое считывает данные с URL / порта, выполняет их обработку и возвращает JSON.Затем я преобразовываю JSON в строку и опускаю его в Kafka.
Текущая производительность и заметная проблема
Если я просто выполню обработку -> я смогу выполнить около 30 000 строк через функцию, однако, когда я добавлю функцию, чтобы преобразовать ее в STring, а затем опуститься доkafka Моя пропускная способность падает до 17 000 строк в секунду.
Нужно ли конвертировать JSON в String, прежде чем я опущусь в Kafka?Если нет, то как мне вставить json ObjectNode в kafka?
Иначе, какие есть другие решения.Я думаю, что узким местом является функция в строку
Я пытался преобразовать JSON в строку, используя несколько методов (функция .toString, StringBuilder в строку).
// Read from Source
val in_stream = env.socketTextStream(url, port, socket_stream_deliminator, socket_connection_retries).setParallelism(1)
// Perform Process
.map(x=>{Process(x)}).setParallelism(1)
// Convert to STring
.map(x => ObjectNodeToString({
val json_string_builder = StringBuilder.newBuilder
json_string_builder.append(x)
return json_string_builder.toString()
})).setParallelism(1)
// sink data
.addSink(new FlinkKafkaProducer[String](broker_hosts, global_topic, new SimpleStringSchema()))
Я хотел бы сохранитьобработка 30 000 строк в секунду.который я получаю без функции преобразования в строку.Могу ли я погрузить ObjectNode непосредственно в kafka?