У меня есть DStream [String, String]. Я использую foreachRDD для получения каждого RDD и публикую sh в Кафке. Проблема, с которой я столкнулся, заключается в том, что мне нужно гарантировать, что String сериализуется, а значение моего RDD не сериализуемо по неизвестной причине. Кафка ожидает получить StringSerializer в качестве значения, но, как вы можете видеть на изображении ниже, мой DStream не сериализовал String. Как я могу преобразовать String, не сериализуемый в serializabel до публикации sh в Kafka? Я мог бы изменить kafConf, но я бы предпочел изменить значение вместо конфигурации Kafka.
def kafkaConf(brokers : String) = {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props
}
Ошибка публикации в kafka