Альтернатива lowerByKey / aggregateByKey для потоковой передачи искры DStream [Class] - PullRequest
1 голос
/ 28 апреля 2019

Уже есть похожий вопрос здесь , но он использует Maven, а я использую sbt. Более того, ни одно из решений там не работало для меня

Я использую Spark 2.4.0, Scala 2.11.12 и IntelliJ IDEA 2019.1

Мой build.sbt выглядит так:

libraryDependencies ++= Seq(
    "com.groupon.sparklint" %% "sparklint-spark212" % "1.0.12" excludeAll ExclusionRule(organization = "org.apache.spark"),
    "org.apache.spark" %% "spark-core" % "2.4.0",
    "org.apache.spark" %% "spark-sql" % "2.4.0",
    "org.apache.spark" %% "spark-streaming" % "2.4.0",
    "org.apache.spark" %% "spark-streaming-kafka" % "1.6.2",
    "com.datastax.spark" %% "spark-cassandra-connector" % "2.4.0",
    "com.typesafe.slick" %% "slick" % "3.3.0",
    "org.slf4j" % "slf4j-nop" % "1.6.4",
    "com.typesafe.slick" %% "slick-hikaricp" % "3.3.0",
    "com.typesafe.slick" %% "slick-extensions" % "3.0.0"
)

Изменить все:

Я получу поток данных от Kafka, который будет отправлен в контекст Spark Streaming с помощью:

val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

Исходя из этого, я хочу создать поток объектов RawWeatherData. Пример вывода из потока будет выглядеть так:

* +1019 * (нуль, 725030: 14732,2008,12,31 * * 1 021, 11,0.6, -6.7,1001.7,80,6.2,8, * * 0,0 тысяча двадцать две 0,0)

Все выглядит хорошо, за исключением того, что мне нужно удалить первое значение null, чтобы создать поток объектов RawWeatherData, так как конструктор не может принять первое нулевое значение, но может принять все другие значения из потока.

Просто для ясности, вот как выглядит RawWeatherData (я не могу это отредактировать):

case class RawWeatherData(
                           wsid: String,
                           year: Int,
                           month: Int,
                           day: Int,
                           hour: Int,
                           temperature: Double,
                           dewpoint: Double,
                           pressure: Double,
                           windDirection: Int,
                           windSpeed: Double,
                           skyCondition: Int,
                           skyConditionText: String,
                           oneHourPrecip: Double,
                           sixHourPrecip: Double) extends WeatherModel

Для достижения этой цели я отправляю свой поток в функцию, которая возвращает мне желаемый поток RawWeatherData объектов:

def ingestStream(rawWeatherStream: InputDStream[(String, String)]): DStream[RawWeatherData] = {
    rawWeatherStream.map(_._2.split(",")).map(RawWeatherData(_))
}

Теперь я хочу вставить этот поток в базу данных MySQL / DB2. Из этого RawWeatherData объекта ( 725030: 14732,2008,12,31 , 11,0,6, -6,7,1001,7,80,6,2,8, 0,0 , 0,0), Выделенная полужирная часть слева - это первичный ключ, а полужирная часть справа - это значение, которое необходимо уменьшить / агрегировать.

Итак, я хочу, чтобы мой DStream имел пары ключ-значение ([725030:14732,2008,12,31] , <summed up values for the key>)

Итак, после ingestStream я пытаюсь выполнить это:

parsedWeatherStream.map { weather =>
        (weather.wsid, weather.year, weather.month, weather.day, weather.oneHourPrecip)
    }.saveToCassandra(CassandraKeyspace, CassandraTableDailyPrecip)

После окончания карты я пытаюсь написать .reduceByKey(), но когда я пытаюсь это сделать, ошибка говорит Cannot resolve symbol reduByKey`. Я не уверен, почему это происходит, поскольку эта функция доступна в документации по искрам.

PS. Прямо сейчас weather.oneHourPrecip установлен на counter в Кассандре, поэтому Кассандра будет автоматически собирать значение для меня. Но это не будет возможно в других базах данных, таких как DB2, поэтому я хотел бы заменить apt, например, reduceByKey в spark. Есть ли способ, чтобы продолжить в таком случае?

1 Ответ

0 голосов
/ 28 апреля 2019

Тип вашего потока DStream[RawWeatherData], а reduceByKey доступен только для потоков типа DStream[(K,V)], представляющих собой поток кортежей, состоящий из ключа и значения.

То, что вы хотели сделать, этовероятно, использовать mapValues вместо map:

 val parsedWeatherStream: DStream[(String, RawWeatherData)] = rawWeatherStream
     .mapValues(_.split(","))
     .mapValues(RawWeatherData) 

Как вы можете видеть по типу parsedWeatherStream из приведенного выше фрагмента, если вы используете mapValues, вы не откажетесьваши ключи, и вы могли бы использовать reduceByKey.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...