Вставка объемных данных из Spark Streaming в базу данных с помощью Slick - PullRequest
0 голосов
/ 28 апреля 2019

У меня есть следующий код:

val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val parsedWeatherStream: DStream[RawWeatherData] = ingestStream(rawWeatherStream)

, где я принимаю все, что приходит от Производителя Кафки. Затем он отправляется в функцию ingestStream, которая возвращает мне все объекты, проанализированные как объекты RawWeatherData(). Код для этого:

def ingestStream(rawWeatherStream: InputDStream[(String, String)]): DStream[RawWeatherData] = {
    val parsedWeatherStream = rawWeatherStream.map(_._2.split(","))
      .map(RawWeatherData(_))
    parsedWeatherStream
}

Теперь я хочу сохранить все, что поступает из parsedWeatherStream, в базу данных DB2, используя slick. Для которого моя схема:

case class RawWeatherData(tag: Tag) extends Table[(String, Int, Int, Int, Int, Double, Double, Double, Int, Double, Int, String, Double, Double)](tag , "RawWeatherData") with WeatherModel{
    var wsid = column[String]("wsid")
    var year = column[Int]("year")
    var month = column[Int]("month")
    var day = column[Int]("day")
    var hour = column[Int]("hour")
    var temperature = column[Double]("temperature")
    var dewpoint = column[Double]("depoint")
    var pressure = column[Double]("pressure")
    var windDirection = column[Int]("windDirection")
    var windSpeed = column[Double]("windSpeed")
    var skyCondition = column[Int]("skyCondition")
    var skyConditionText = column[String]("skyConditionText")
    var oneHourPrecip = column[Double]("oneHourPrecip")
    var sixHourPrecip = column[Double]("sixHourPrecip")

    def * = (wsid , year , month , day , hour , temperature , dewpoint , pressure , windDirection , windSpeed , skyCondition , skyConditionText , oneHourPrecip , sixHourPrecip)
}

Чтобы сохранить данные в базе данных, я пишу следующий код:

val rawWeatherData = TableQuery[com.killrweather.data.RWD]
val db2 = Database.forConfig("db2")
val setup = DBIO.seq(
    // Create the tables, including primary and foreign keys
    rawWeatherData.schema.create,

    // Insert some suppliers
    parsedWeatherStream.foreachRDD { rawWeatherData += _._}
)
//parsedWeatherStream.foreachRDD { rawWeatherData += _._}
val setupFuture = db2.run(setup)

parsedWeatherStream.foreachRDD { rawWeatherData += _._} часть кода, где я пытаюсь добавить в базу данных, анализируя все объекты, но, к сожалению, в этой строке кода, я получаю следующую ошибку Type mismatch, found: Unit, required: DBIOAction[_, NoStream, NotInferredE]

Так как мне теперь вставить данные из Spark Streaming в базу данных с помощью Slick?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...