Количество PySpark отличается от группы в RDD - PullRequest
0 голосов
/ 24 декабря 2018

У меня есть СДР с датой и временем как tuple, и я хочу подсчитать уникальные имена хостов по дате.

СДР:

X = [(datetime.datetime(1995, 8, 1, 0, 0, 1), u'in24.inetnebr.com'),
     (datetime.datetime(1995, 8, 1, 0, 0, 7), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 1, 0, 0, 8), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 2, 0, 0, 8), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 2, 0, 0, 8), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 2, 0, 0, 9), u'ix-esc-ca2-07.ix.netcom.com'),
     (datetime.datetime(1995, 8, 3, 0, 0, 10), u'uplherc.upl.com'),
     (datetime.datetime(1995, 8, 3, 0, 0, 10), u'slppp6.intermind.net'),
     (datetime.datetime(1995, 8, 4, 0, 0, 10), u'piweba4y.prodigy.com'),
     (datetime.datetime(1995, 8, 5, 0, 0, 11), u'slppp6.intermind.net')]

желаемый результат:

[(datetime.datetime(1995, 8, 1, 0, 0, 1), 2),
 (datetime.datetime(1995, 8, 2, 0, 0, 8), 2),
 (datetime.datetime(1995, 8, 3, 0, 0, 10), 2),
 (datetime.datetime(1995, 8, 4, 0, 0, 10), 1),
 (datetime.datetime(1995, 8, 5, 0, 0, 11), 1)]

МОЯ ПОПЫТКА:

dayGroupedHosts = X.groupBy(lambda x: x[0]).distinct()
dayHostCount = dayGroupedHosts.count()

Я получаю сообщение об ошибке при выполнении операции count.Я новичок в Spark, и я хотел бы знать правильный и эффективный transformation для решения таких задач.

Большое спасибо заранее.

Ответы [ 2 ]

0 голосов
/ 24 декабря 2018

Или преобразовать в DataFrame и использовать countDistinct метод:

import pyspark.sql.functions as f

df = spark.createDataFrame(X, ["dt", "hostname"])
df.show()
+-------------------+--------------------+
|                 dt|            hostname|
+-------------------+--------------------+
|1995-08-01 00:00:01|   in24.inetnebr.com|
|1995-08-01 00:00:07|     uplherc.upl.com|
|1995-08-01 00:00:08|     uplherc.upl.com|
|1995-08-02 00:00:08|     uplherc.upl.com|
|1995-08-02 00:00:08|     uplherc.upl.com|
|1995-08-02 00:00:09|ix-esc-ca2-07.ix....|
|1995-08-03 00:00:10|     uplherc.upl.com|
|1995-08-03 00:00:10|slppp6.intermind.net|
|1995-08-04 00:00:10|piweba4y.prodigy.com|
|1995-08-05 00:00:11|slppp6.intermind.net|
+-------------------+--------------------+

df.groupBy(f.to_date('dt').alias('date')).agg(
  f.countDistinct('hostname').alias('hostname')
).show()
+----------+--------+
|      date|hostname|
+----------+--------+
|1995-08-02|       2|
|1995-08-03|       2|
|1995-08-01|       2|
|1995-08-04|       1|
|1995-08-05|       1|
+----------+--------+
0 голосов
/ 24 декабря 2018

Вам необходимо сначала преобразовать ключи в даты.Затем сгруппируйте по ключу и посчитайте различные значения:

X.map(lambda x: (x[0].date(), x[1]))\
    .groupByKey()\
    .mapValues(lambda vals: len(set(vals)))\
    .sortByKey()\
    .collect()
#[(datetime.date(1995, 8, 1), 2),
# (datetime.date(1995, 8, 2), 2),
# (datetime.date(1995, 8, 3), 2),
# (datetime.date(1995, 8, 4), 1),
# (datetime.date(1995, 8, 5), 1)]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...