входные данные kafka byte [] преобразованы в java Строка очень медленная при переопределенном мигании AbstractKafkaDeserializationSchema - PullRequest
0 голосов
/ 22 апреля 2020

У меня есть приложение Flink, которое читает mysql CD C json сообщений от Кафки. 5 таблиц json CD C строки читаются и обрабатываются, и я использовал переопределенную AbstractKafkaDeserializationSchema, чтобы превратить байт Kafka [] в мой настроенный объект BEAN. но я обнаружил, что среди 5 таблиц есть 2 таблицы, их входной байт kafka [] занял много времени для преобразования в строку, чем другие 3 таблицы, в худшем случае он просто застрял там минуты и даже как навсегда, и в исходном интерфейсе flink в Source есть обратное давление подзадачи. преобразование просто String strValue = new String(valueByte). Также я попробовал new String(valueByte, "UTF-8"), new String(valueByte, StandardCharsets.US_ASCII), без разницы. переопределенный метод просто:

deserialize(byte[] keyBytes, byte[] valueBytes, String topic, int partition, long offset) throws IOException

эта проблема помешала мне выпустить приложение на 1 неделю, так как преобразование очень простое, я не могу найти альтернативных способов сделать это, поиск по stackoverflow и найти некоторые похожи жалуются, но у меня нет рабочего решения.

...