У меня есть разделенный фрейм данных, сохраненный в hdfs. Я должен периодически загружать новые данные из темы кафки и обновлять данные hdfs. Данные просты: это просто количество твитов, полученных за определенный период времени.
Итак, раздел Jan 18, 10 AM
может иметь значение 2
, и я могу получить поздние данные от kafka, состоящие из 3 твитов, отправленных на Jan 18, 10 AM
. Итак, мне нужно обновить Jan 18, 10 AM
до значения 2+3=5
.
Мое текущее решение плохо, потому что я
- загрузить все из hdfs в RAM
- удалить все из hdfs
- читать новый фрейм данных из kafka
- объединить 2 кадра данных
- записать новый объединенный фрейм данных в hdfs.
(я предоставил комментарии в своем коде для каждого шага.)
Проблема в том, что размер файла данных, хранящегося в hdf, может составлять 1 ТБ, и это невозможно.
import com.jayway.jsonpath.JsonPath
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType}
//scalastyle:off
object TopicIngester {
val mySchema = new StructType(Array(
StructField("date", StringType, nullable = true),
StructField("key", StringType, nullable = true),
StructField("cnt", IntegerType, nullable = true)
))
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]") // remove this later
.appName("Ingester")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions.count
// read the old one
val old = spark.read
.schema(mySchema)
.format("csv")
.load("/user/maria_dev/test")
// remove it
val fs = FileSystem.get(new Configuration)
val outPutPath = "/user/maria_dev/test"
if (fs.exists(new Path(outPutPath))) {
fs.delete(new Path(outPutPath), true)
}
// read the new one
val _new = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "sandbox-hdp.hortonworks.com:6667")
.option("subscribe", "test1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
.selectExpr("CAST(value AS String)")
.as[String]
.map(value => transformIntoObject(value))
.map(tweet => (tweet.tweet, tweet.key, tweet.date))
.toDF("tweet", "key", "date")
.groupBy("date", "key")
.agg(count("*").alias("cnt"))
// combine the old one with the new one and write to hdfs
_new.union(old)
.groupBy("date", "key")
.agg(sum("sum").alias("cnt"))
.write.partitionBy("date", "key")
.csv("/user/maria_dev/test")
spark.stop()
}
def transformIntoObject(tweet: String): TweetWithKeys = {
val date = extractDate(tweet)
val hashTags = extractHashtags(tweet)
val tagString = String.join(",", hashTags)
TweetWithKeys(tweet, date, tagString)
}
def extractHashtags(str: String): java.util.List[String] = {
JsonPath.parse(str).read("$.entities.hashtags[*].text")
}
def extractDate(str: String): String = {
JsonPath.parse(str).read("$.created_at")
}
final case class TweetWithKeys(tweet: String, date: String, key: String)
}
Как мне загрузить только необходимые разделы и обновить их более эффективно?