SCALA: Как использовать функцию сбора для получения последней измененной записи из кадра данных? - PullRequest
3 голосов
/ 14 января 2020

У меня есть scala фрейм данных с двумя столбцами:

  • id : строка
  • обновлено : Отметка времени

С этого кадра данных я просто хочу получить самую последнюю дату, для которой я сейчас использую следующий код:

df.agg(max("updated")).head()
// returns a row

I ' мы только что прочитали о функции collect () , которую, как мне сказали, безопаснее использовать для такой проблемы - когда она запускается как задание, кажется, что она не агрегирует max для всего набора данных Это выглядит прекрасно, когда он работает в ноутбуке - но я не понимаю, как его следует использовать.

Я нашел реализацию, как показано ниже, но я не мог понять, как это следует использовать ...

df1.agg({"x": "max"}).collect()[0]

Я пробовал это следующим образом:

df.agg(max("updated")).collect()(0)

Без (0) он возвращает массив, который на самом деле выглядит хорошо. Таким образом, идея заключается в том, что мы должны применять агрегацию ко всему набору данных, загруженному в накопитель, а не только к разделенной версии, в противном случае он, похоже, не получает все временные метки. Теперь у меня вопрос, как на самом деле должен работать collect () в такой ситуации?

Заранее большое спасибо!

Ответы [ 2 ]

1 голос
/ 14 января 2020

Я предполагаю, что вы говорите о кадре данных искры (не scala). Если вы просто хотите получить самую последнюю дату (только этот столбец), вы можете сделать:

df.select(max("updated"))

Вы можете увидеть, что находится внутри фрейма данных с помощью df.show(). Поскольку df являются неизменяемыми, вам нужно присвоить результат выбора другой переменной или добавить шоу после select(). Это вернет фрейм данных с одной строкой с максимальным значением в «обновленном» столбце. Чтобы ответить на ваш вопрос:

Итак, идея заключается в том, что мы должны применять агрегацию ко всему набору данных, загруженному в накопитель, а не только к разделенной версии, в противном случае он, похоже, не получает всю временную метку

Когда вы выбираете на фрейме данных, spark выберет данные из всего набора данных, а не разделенная версия и версия драйвера. Spark осветит ваши данные в вашем кластере, и все операции, которые вы определили, будут выполнены со всем набором данных.

Мой вопрос теперь заключается в том, как на самом деле должен работать collect () в такой ситуации?

Операция сбора преобразует из искрового фрейма данных в массив ( который не распределяется) и массив будет находиться в узле драйвера, имейте в виду, что если размер вашего кадра данных превышает объем памяти, доступной в драйвере, у вас будет outOfMemoryError.

В этом случае, если вы выполните:

df.select(max("Timestamp")).collect().head

Вы DF (который содержит только одну строку с одним столбцом, который является вашей датой), будет преобразован в массив scala. В этом случае это безопасно, потому что select(max()) вернет только одну строку.

Потратьте некоторое время, чтобы узнать больше о искре dataframe / rdd и разнице между преобразованием и действием.

0 голосов
/ 14 января 2020

Звучит странно. Прежде всего вам не нужно собирать фрейм данных, чтобы получить последний элемент отсортированного фрейма данных. Есть много ответов на эти темы:

Как получить последний ряд из DataFrame?

...