Дважды сопоставьте PySpark RDD для группировки по разным ключам - PullRequest
0 голосов
/ 16 февраля 2020

У меня есть следующее RDD:

timeRange = (access_logs
              .map(lambda log: (log.date_time, 1))
              .reduceByKey(lambda a, b : a + b)
              .map(lambda s: s)
              .take(2000))

print("IpAddresses by time range: {}".format(timeRange))

, и моя схема выглядит следующим образом:

def parse_apache_log_line(logline):
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        raise Exception("Invalid logline: %s" % logline)
    return Row(
        ip_address    = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = match.group(4),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = int(match.group(9))
    )

пример файла журнала:

129.192.176.24 - - [25/May/2015:23:11:16 +0000] "GET / HTTP/1.0" 200 3557 "-" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; snprtz|S26320700000083|2600#Service Pack 1#2#5#154321|isdn)"

I хотите сгруппировать и отобразить по метке времени, затем по IP-адресу и его количеству в пределах метки времени. Сейчас я могу сопоставить IP-адрес и получить что-то вроде ('25/May/2015:23:11:15 +0000', 1995), но я искал что-то вроде: ('25/May/2015:23:11:15 +0000', ('1.2.3.4', 20)).

1 Ответ

1 голос
/ 16 февраля 2020

Вы можете просто уменьшить ключ (date_time, ip_address) на первом шаге, а затем сгруппировать на date_time.

Попробуйте:

timeRange = (access_logs
             .map(lambda log: ((log.date_time, log.ip_address), 1))
             .reduceByKey(lambda a, b: a + b)
             .map(lambda x: (x[0][0], (x[0][1], x[1]))) # <=> (date_time, (ip_address, count))
             .groupByKey()
             .map(lambda x: (x[0], list(x[1]))) # this final step to get list as groupBy gives ResultIterable object
            )
...