У меня есть следующий код:
val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val parsedWeatherStream: DStream[RawWeatherData] = ingestStream(rawWeatherStream)
, где я принимаю все, что приходит от Производителя Кафки. Затем он отправляется в функцию ingestStream
, которая возвращает мне все объекты, проанализированные как объекты RawWeatherData()
. Код для этого:
def ingestStream(rawWeatherStream: InputDStream[(String, String)]): DStream[RawWeatherData] = {
val parsedWeatherStream = rawWeatherStream.map(_._2.split(","))
.map(RawWeatherData(_))
parsedWeatherStream
}
Теперь я хочу сохранить все, что поступает из parsedWeatherStream
, в базу данных DB2, используя slick. Для которого моя схема:
case class RawWeatherData(tag: Tag) extends Table[(String, Int, Int, Int, Int, Double, Double, Double, Int, Double, Int, String, Double, Double)](tag , "RawWeatherData") with WeatherModel{
var wsid = column[String]("wsid")
var year = column[Int]("year")
var month = column[Int]("month")
var day = column[Int]("day")
var hour = column[Int]("hour")
var temperature = column[Double]("temperature")
var dewpoint = column[Double]("depoint")
var pressure = column[Double]("pressure")
var windDirection = column[Int]("windDirection")
var windSpeed = column[Double]("windSpeed")
var skyCondition = column[Int]("skyCondition")
var skyConditionText = column[String]("skyConditionText")
var oneHourPrecip = column[Double]("oneHourPrecip")
var sixHourPrecip = column[Double]("sixHourPrecip")
def * = (wsid , year , month , day , hour , temperature , dewpoint , pressure , windDirection , windSpeed , skyCondition , skyConditionText , oneHourPrecip , sixHourPrecip)
}
Чтобы сохранить данные в базе данных, я пишу следующий код:
val rawWeatherData = TableQuery[com.killrweather.data.RWD]
val db2 = Database.forConfig("db2")
val setup = DBIO.seq(
// Create the tables, including primary and foreign keys
rawWeatherData.schema.create,
// Insert some suppliers
parsedWeatherStream.foreachRDD { rawWeatherData += _._}
)
//parsedWeatherStream.foreachRDD { rawWeatherData += _._}
val setupFuture = db2.run(setup)
parsedWeatherStream.foreachRDD { rawWeatherData += _._}
часть кода, где я пытаюсь добавить в базу данных, анализируя все объекты, но, к сожалению, в этой строке кода, я получаю следующую ошибку Type mismatch, found: Unit, required: DBIOAction[_, NoStream, NotInferredE]
Так как мне теперь вставить данные из Spark Streaming в базу данных с помощью Slick?