Как мне обновить только определенные разделы в спарк? - PullRequest
0 голосов
/ 15 января 2019

У меня есть разделенный фрейм данных, сохраненный в hdfs. Я должен периодически загружать новые данные из темы кафки и обновлять данные hdfs. Данные просты: это просто количество твитов, полученных за определенный период времени.

Итак, раздел Jan 18, 10 AM может иметь значение 2, и я могу получить поздние данные от kafka, состоящие из 3 твитов, отправленных на Jan 18, 10 AM. Итак, мне нужно обновить Jan 18, 10 AM до значения 2+3=5.

Мое текущее решение плохо, потому что я

  • загрузить все из hdfs в RAM
  • удалить все из hdfs
  • читать новый фрейм данных из kafka
  • объединить 2 кадра данных
  • записать новый объединенный фрейм данных в hdfs.

(я предоставил комментарии в своем коде для каждого шага.)

Проблема в том, что размер файла данных, хранящегося в hdf, может составлять 1 ТБ, и это невозможно.

import com.jayway.jsonpath.JsonPath
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}

//scalastyle:off
object TopicIngester {
  val mySchema = new StructType(Array(
    StructField("date", StringType, nullable = true),
    StructField("key", StringType, nullable = true),
    StructField("cnt", IntegerType, nullable = true)
  ))

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[*]") // remove this later
      .appName("Ingester")
      .getOrCreate()

    import spark.implicits._
    import org.apache.spark.sql.functions.count

    // read the old one
    val old = spark.read
      .schema(mySchema)
      .format("csv")
      .load("/user/maria_dev/test")

    // remove it
    val fs = FileSystem.get(new Configuration)
    val outPutPath = "/user/maria_dev/test"

    if (fs.exists(new Path(outPutPath))) {
      fs.delete(new Path(outPutPath), true)
    }

    // read the new one
    val _new = spark.read
      .format("kafka")
      .option("kafka.bootstrap.servers", "sandbox-hdp.hortonworks.com:6667")
      .option("subscribe", "test1")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS String)")
      .as[String]
      .map(value => transformIntoObject(value))
      .map(tweet => (tweet.tweet, tweet.key, tweet.date))
      .toDF("tweet", "key", "date")
      .groupBy("date", "key")
      .agg(count("*").alias("cnt"))


     // combine the old one with the new one and write to hdfs
    _new.union(old)
        .groupBy("date", "key")
        .agg(sum("sum").alias("cnt"))
        .write.partitionBy("date", "key")
        .csv("/user/maria_dev/test")

    spark.stop()
  }

  def transformIntoObject(tweet: String): TweetWithKeys = {
    val date = extractDate(tweet)
    val hashTags = extractHashtags(tweet)

    val tagString = String.join(",", hashTags)

    TweetWithKeys(tweet, date, tagString)
  }

  def extractHashtags(str: String): java.util.List[String] = {
    JsonPath.parse(str).read("$.entities.hashtags[*].text")
  }

  def extractDate(str: String): String = {
    JsonPath.parse(str).read("$.created_at")
  }

  final case class TweetWithKeys(tweet: String, date: String, key: String)

}

Как мне загрузить только необходимые разделы и обновить их более эффективно?

...