Я пытаюсь прочитать сообщение Кафки от производителя, внедрить в него некоторую базовую фильтрацию и, наконец, распечатать вывод в конце потребителя.Я передаю 1-е имя файла в качестве аргумента, который будет иметь набор значений, и 2-й файл в качестве 2-го аргумента, который будет иметь критерии фильтра.Когда я запускаю то же самое из IntelliJ, он работает нормально.Когда я пытаюсь использовать «scalac» из командной строки, я получаю «ожидаемое определение класса или объекта».
package kafka_db
object kafkap extends App {
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.util.Properties
import scala.io.Source
import org.apache.kafka.clients.producer._
import kafkaProducer.kafkaProducerScala.producer
val conf = new SparkConf().
setMaster(args(0)).
setAppName("kafkap")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val topic = "kafkatopic"
for (line2 <- Source.fromFile(args(2)).getLines){
val c = line2.toInt
for (line <- Source.fromFile(args(1)).getLines) {
val a = line.toInt
val b = if (a > c) {
var d = a
val record = new ProducerRecord[String, String](topic, d.toString)
producer.send(record)
}
}
}
producer.close()
}