Уже есть похожий вопрос здесь , но он использует Maven, а я использую sbt
. Более того, ни одно из решений там не работало для меня
Я использую Spark 2.4.0, Scala 2.11.12 и IntelliJ IDEA 2019.1
Мой build.sbt
выглядит так:
libraryDependencies ++= Seq(
"com.groupon.sparklint" %% "sparklint-spark212" % "1.0.12" excludeAll ExclusionRule(organization = "org.apache.spark"),
"org.apache.spark" %% "spark-core" % "2.4.0",
"org.apache.spark" %% "spark-sql" % "2.4.0",
"org.apache.spark" %% "spark-streaming" % "2.4.0",
"org.apache.spark" %% "spark-streaming-kafka" % "1.6.2",
"com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0",
"com.typesafe.slick" %% "slick" % "3.3.0",
"org.slf4j" % "slf4j-nop" % "1.6.4",
"com.typesafe.slick" %% "slick-hikaricp" % "3.3.0",
"com.typesafe.slick" %% "slick-extensions" % "3.0.0"
)
Изменить все:
Я получу поток данных от Kafka, который будет отправлен в контекст Spark Streaming с помощью:
val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
Исходя из этого, я хочу создать поток объектов RawWeatherData
. Пример вывода из потока будет выглядеть так:
* +1019 * (нуль,
725030: 14732,2008,12,31 * * 1 021, 11,0.6, -6.7,1001.7,80,6.2,8, * * 0,0 тысяча двадцать две 0,0)
Все выглядит хорошо, за исключением того, что мне нужно удалить первое значение null
, чтобы создать поток объектов RawWeatherData
, так как конструктор не может принять первое нулевое значение, но может принять все другие значения из потока.
Просто для ясности, вот как выглядит RawWeatherData
(я не могу это отредактировать):
case class RawWeatherData(
wsid: String,
year: Int,
month: Int,
day: Int,
hour: Int,
temperature: Double,
dewpoint: Double,
pressure: Double,
windDirection: Int,
windSpeed: Double,
skyCondition: Int,
skyConditionText: String,
oneHourPrecip: Double,
sixHourPrecip: Double) extends WeatherModel
Для достижения этой цели я отправляю свой поток в функцию, которая возвращает мне желаемый поток RawWeatherData
объектов:
def ingestStream(rawWeatherStream: InputDStream[(String, String)]): DStream[RawWeatherData] = {
rawWeatherStream.map(_._2.split(",")).map(RawWeatherData(_))
}
Теперь я хочу вставить этот поток в базу данных MySQL / DB2. Из этого RawWeatherData
объекта ( 725030: 14732,2008,12,31 , 11,0,6, -6,7,1001,7,80,6,2,8, 0,0 , 0,0), Выделенная полужирная часть слева - это первичный ключ, а полужирная часть справа - это значение, которое необходимо уменьшить / агрегировать.
Итак, я хочу, чтобы мой DStream имел пары ключ-значение ([725030:14732,2008,12,31] , <summed up values for the key>)
Итак, после ingestStream
я пытаюсь выполнить это:
parsedWeatherStream.map { weather =>
(weather.wsid, weather.year, weather.month, weather.day, weather.oneHourPrecip)
}.saveToCassandra(CassandraKeyspace, CassandraTableDailyPrecip)
После окончания карты я пытаюсь написать .reduceByKey()
, но когда я пытаюсь это сделать, ошибка говорит Cannot resolve symbol
reduByKey`. Я не уверен, почему это происходит, поскольку эта функция доступна в документации по искрам.
PS. Прямо сейчас weather.oneHourPrecip
установлен на counter
в Кассандре, поэтому Кассандра будет автоматически собирать значение для меня. Но это не будет возможно в других базах данных, таких как DB2, поэтому я хотел бы заменить apt, например, reduceByKey
в spark. Есть ли способ, чтобы продолжить в таком случае?