Проблема пакета Scala с ZKStringSerializer - PullRequest
0 голосов
/ 03 июля 2018

Я пытаюсь использовать класс ZKStringSerializer, который я получаю с

import kafka.utils.ZKStringSerializer

Согласно всему интернету и даже моему собственному коду перед перезапуском с компьютера, это должно позволить моему коду работать. Однако теперь я получаю невероятно запутанную ошибку компиляции

object ZKStringSerializer in package utils cannot be accessed in package kafka.utils

Это сбивает с толку, потому что этот файл не должен быть в каком-либо пакете, и я нигде не указываю пакет. Это мой код:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.types._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.ZkConnection
import java.util.Properties

import org.apache.kafka.clients.admin
import kafka.admin.{AdminUtils, RackAwareMode}
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}


object SpeedTester {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder.master("local[4]").appName("SpeedTester").config("spark.driver.memory", "8g").getOrCreate()
    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)
    import spark.implicits._
    val zookeeperConnect = "localhost:2181"
    val sessionTimeoutMs = 10000
    val connectionTimeoutMs = 10000
    val zkClient = new ZkClient(zookeeperConnect, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer)
    val topicName = "testTopic"
    val numPartitions = 8
    val replicationFactor = 1
    val topicConfig = new Properties
    val isSecureKafkaCluster = false
    val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster)
    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig)

    // Create producer for topic testTopic and actually push values to the topic
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9592")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val TOPIC = "testTopic"
    for (i <- 1 to 50) {
      val record = new ProducerRecord(TOPIC, "key", s"hello $i")
      producer.send(record)
    }

    val record = new ProducerRecord(TOPIC, "key", "the end" + new java.util.Date)
    producer.send(record)
    producer.flush()
    producer.close()
  }
}
...