Фильтрация csv RDD с полем JSON с помощью Spark / Scala - PullRequest
0 голосов
/ 29 мая 2020

Я изучаю Spark / scala, и мне нужно отфильтровать RDD по указанному c полю в столбце, в данном случае user.

Я хочу вернуть RDD с пользователями ["Joe","Plank","Willy"], но не может понять, как

Это мой RDD:

2020-03-01T00:00:05Z    my.local5.url   {"request_method":"GET","request_length":281,"user":"Joe"}
2020-03-01T00:00:05Z    my.local2.url   {"request_method":"GET","request_length":281,"user":"Plank"}
2020-03-01T00:00:05Z    my.local2.url   {"request_method":"GET","request_length":281,"user":"Willy"}
2020-03-01T00:00:05Z    my.local6.url   {"request_method":"GET","request_length":281,"user":"Plank"}
2020-03-01T00:00:05Z    my.local2.url   {"request_method":"GET","request_length":281,"user":"Plank"}
2020-03-01T00:00:05Z    my.local2.url   {"request_method":"GET","request_length":281,"user":"Tracy"}
2020-03-01T00:00:05Z    my.local6.url   {"request_method":"GET","request_length":281,"user":"Roger"}

Ожидаемый результат:

2020-03-01T00:00:05Z    my.local5.url   {"request_method":"GET","request_length":281,"user":"Joe"}
2020-03-01T00:00:05Z    my.local2.url   {"request_method":"GET","request_length":281,"user":"Plank"}
2020-03-01T00:00:05Z    my.local2.url   {"request_method":"GET","request_length":281,"user":"Willy"}
2020-03-01T00:00:05Z    my.local6.url   {"request_method":"GET","request_length":281,"user":"Plank"}
2020-03-01T00:00:05Z    my.local2.url   {"request_method":"GET","request_length":281,"user":"Plank"}

I ' мы извлекаем rdd, используя искру, примерно так (псевдокод):

val sparkConf = new SparkConf().setAppName("MyApp")
master.foreach(sparkConf.setMaster)
val sc = new SparkContext(sparkConf)

val rdd = sc.textFile(inputDir)
rdd.filter(_.contains("\"user\":\"THE_ARRAY_OF_NAMES_"))

1 Ответ

0 голосов
/ 29 мая 2020

Вам проще использовать фреймы данных.

Используя функцию from_ json, вы можете преобразовать этот json столбец в несколько столбцов

val jsonSchema = StructType(Array(
    StructField("request_method",StringType,true),
    StructField("request_length",IntegerType,true),
    StructField("user",StringType,true)
  ))

val myDf = spark.read.option("header", "true").csv(path)
val formatedDf = myDf.withColumn("formated_json", from_json($"column_name", jsonSchema)
.select($"formated_json.*")
.where($"user".isin("Joe","Plank","Willy")

formatedDf.show

Но если вам нужен подход RDD , пожалуйста, дайте мне знать.

Редактировать с помощью версии RDD: Помните, что это один из подходов Мэнни

//Define a regex pattern
val Pattern = """(?i)"user":"([a-zA-Z]+)"""".r
//Define a Set with your filtered values
val userSet = Set("Joe","Plank","Willy")
//Filter only the values you want
val filteredRdd = rdd.filter( x => {
    //Extract the user using the pattern we just declared
    val user = for(m <- Pattern.findFirstMatchIn(x)) yield m.group(1)
    //If the user variable is equal with one of your set values then this statement will return true and based on that the row will be kept
    userSet(user.getOrElse(""))
})

Чтобы увидеть, правильный ли результат, вы можете использовать:

filteredRdd.collect().foreach(println)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...