Я пытаюсь использовать класс ZKStringSerializer, который я получаю с
import kafka.utils.ZKStringSerializer
Согласно всему интернету и даже моему собственному коду перед перезапуском с компьютера, это должно позволить моему коду работать. Однако теперь я получаю невероятно запутанную ошибку компиляции
object ZKStringSerializer in package utils cannot be accessed in package kafka.utils
Это сбивает с толку, потому что этот файл не должен быть в каком-либо пакете, и я нигде не указываю пакет. Это мой код:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.ZkConnection
import java.util.Properties
import org.apache.kafka.clients.admin
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
object SpeedTester {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.master("local[4]").appName("SpeedTester").config("spark.driver.memory", "8g").getOrCreate()
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
import spark.implicits._
val zookeeperConnect = "localhost:2181"
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
val zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)
val topicName = "testTopic"
val numPartitions = 8
val replicationFactor = 1
val topicConfig = new Properties
val isSecureKafkaCluster = false
val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster)
AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig)
// Create producer for topic testTopic and actually push values to the topic
val props = new Properties()
props.put("bootstrap.servers", "localhost:9592")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC = "testTopic"
for (i <- 1 to 50) {
val record = new ProducerRecord(TOPIC, "key", s"hello $i")
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end" + new java.util.Date)
producer.send(record)
producer.flush()
producer.close()
}
}