Давайте подготовим данные и запустим spark-shell
cat <<-EOF >in
192.167.56.1-45195 “ GET \docsodb.sp.ip \..”
192.167.56.1-45195 “ GET \https://<url> \..”
238.142.23.5-24788 “ GET \docsodb.sp.ip \..”
238.142.23.5-24788 “ GET \ https://<url> \..”
30.169.77.213-16745 “ GET \docsodb.sp.ip \..”
30.169.77.213-16745 “ GET \ https://<url> \..”
EOF
spark-shell
Теперь внутри spark-shell мы загрузим данные из текстового файла в DataFrame, затем проанализируем их на основе группы захвата regexp, наконец, сгруппируем по Portnumber и UserId, чтобы получить в одной строке и Division_string, и URL, все с использованием DataFrame. API.
import spark.implicits._
// Load data
val df = spark.read.format("text").load("in")
// Regexp to parse input line
val re = """([\d\.]+)-(\d+) “ GET \\ ?([^\s]+)"""
// Transform
df.select(regexp_extract('value, re, 1).as("Portnumber"),
regexp_extract('value, re, 2).as("UserId"),
regexp_extract('value, re, 3).as("URL_or_div"))
.groupBy('Portnumber, 'UserId)
.agg(max(when('URL_or_div.like("https%"), 'URL_or_div)).as("URL"),
max(when('URL_or_div.like("docsodb%"), 'URL_or_div)).as("division_stringL"))
.show
+-------------+------+-------------+---------------+
| Portnumber|UserId| URL|division_string|
+-------------+------+-------------+---------------+
|30.169.77.213| 16745|https://<url>| docsodb.sp.ip|
| 192.167.56.1| 45195|https://<url>| docsodb.sp.ip|
| 238.142.23.5| 24788|https://<url>| docsodb.sp.ip|
+-------------+------+-------------+---------------+
Отвечая на ваш последний вопрос, DataFrame API или Spark SQL предпочтительнее операций RDD, если вам не требуется низкоуровневый контроль над обработкой. См. здесь для получения дополнительной информации.