Как использовать пользовательский разрыв строки в функции spark textFile? - PullRequest
1 голос
/ 16 октября 2019

У меня есть следующие данные в файле:

Пользователь: Test

Комментарий: Test

Ссылки: Test1 Test2 Test3

#####

Пользователь: Test1

Комментарий: Test1

Ссылки: Test2 Test3 Test4

...

Теперь я хочу проверить, какой пользователь ссылался на что-то больше всего. Это означает, что корреляция между «пользователем» и «ссылками» должна существовать. При использовании следующего кода каждая строка становится одним элементом в СДР:

elements = sc.textFile(path_to_file)

На этом этапе больше невозможно сделать корреляцию между значениями «Пользователь» и «Ссылки» с такими функциями, какmap (), поскольку элементы СДР обрабатываются независимо.

Можно ли указать функции textFile () использовать собственный разделитель, отличный от перевода строки? (В приведенном выше случае это будет 5 #)

Есть ли другие способы решения этой проблемы?

1 Ответ

1 голос
/ 16 октября 2019

Вы можете попробовать этот подход. Прочитайте как rdd, отфильтруйте пустые строки и разделитель 5 #, затем добавьте индекс для группировки, преобразуйте в DF, groupBy, и у вас будет пользователь и ссылки в одной строке.

  import spark.implicits._
  import org.apache.spark.sql.functions._

  val r1 = spark.sparkContext.textFile("data/splithash.txt")
  val rdd = r1.filter(!_.trim().equals(""))
    .filter(!_.equals("#####"))
    .zipWithIndex()
    .map(s => (s._1, Math.ceil(s._2/3).toInt))

  val df = rdd.toDF()
  df.show()

  df.groupBy('_2).agg(collect_list('_1)).show(false)
    +--------------------+---+
    |                  _1| _2|
    +--------------------+---+
    |          User: Test|  0|
    |       Comment: Test|  0|
    |References: Test1...|  0|
    |         User: Test1|  1|
    |      Comment: Test1|  1|
    |References: Test2...|  1|
    +--------------------+---+

    +---+------------------------------------------------------------+
    |_2 |collect_list(_1)                                            |
    +---+------------------------------------------------------------+
    |1  |[User: Test1, Comment: Test1, References: Test2 Test3 Test4]|
    |0  |[User: Test, Comment: Test, References: Test1 Test2 Test3]  |
    +---+------------------------------------------------------------+
import pyspark.sql.functions as f
import math

r1 = spark.sparkContext.textFile("ok.txt")
rdd = r1.filter(lambda x: x.strip() != '') \
    .filter(lambda x: x != '#####') \
    .zipWithIndex() \
    .map(lambda x: (x[0], math.floor(x[1] / 3)))

rdd.foreach(lambda x: print(x))

df = rdd.toDF()
df.show()


df.groupBy(f.col('_2')).agg(f.collect_list(f.col('_1'))).show(truncate=False)
+--------------------+---+
|                  _1| _2|
+--------------------+---+
|          User: Test|  0|
|       Comment: Test|  0|
|References: Test1...|  0|
|         User: Test1|  1|
|      Comment: Test1|  1|
|References: Test2...|  1|
+--------------------+---+

+---+------------------------------------------------------------+
|_2 |collect_list(_1)                                            |
+---+------------------------------------------------------------+
|0  |[User: Test, Comment: Test, References: Test1 Test2 Test3]  |
|1  |[User: Test1, Comment: Test1, References: Test2 Test3 Test4]|
+---+------------------------------------------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...