Как обнаружить дубликаты в большом файле JSON, используя PySpark HashPartitioner - PullRequest
3 голосов
/ 25 апреля 2019

У меня большой файл json с более чем 20 ГБ метаданных со структурой json. Он содержит простые пользовательские метаданные в каком-либо приложении, и я хотел бы просмотреть его, чтобы обнаружить дубликаты. Вот пример того, как выглядят данные:

{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

Файл json содержит построчно объекты json, которые выглядят очень похоже на это. Дубликат возникает, когда поле "name" двух объектов json совпадает. Итак, это дубликат:

{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

Так же, как два одинаковых объекта json.

Теперь я хочу просмотреть весь файл json, который слишком велик, чтобы поместиться в память, и, используя лучший стандарт, выяснить все дубликаты и их дубликаты , а затем немного логики - логическая часть тривиальна, но я не совсем уверен, как найти дубликаты.

Что я думал о :

  1. Первым, что я подумал об использовании, был фильтр Блума. Они не так запутаны и работают довольно хорошо и быстро, и я думаю, что они сводятся к O (n). Тем не менее, фильтры Bloom не дадут мне знать, что дублирующая строка является дубликатом, что для меня не годится.

  2. Я думал об использовании внешней сортировки слиянием. Я бы в основном разделил файл на несколько файлов меньшего размера, которые поместились бы в памяти, отсортировал каждый кусок и поиска дубликатов (которые теперь сгруппированы вместе). Но я не совсем уверен, что эта реализация - то, чего я хочу.

  3. Следующее, с чем я столкнулся, было хеширование по разделам, что, как я подозреваю, - это то, чего я хочу. По сути, хеширование - это лучший способ найти дубликаты при работе с данными, которые помещаются в память, так почему бы не использовать их для чего-то, что не подходит? Я немного смущен тем, как хэшировать по разделам. Я не уверен, что это то, что я ищу.

Итак, я думаю, что должен использовать вариант 3, хэширование по разделам, и я знаю, что в Spark это есть. Я надеялся, что кто-нибудь может сообщить мне, если я на правильном пути, и, возможно, дать мне несколько инструкций о том, прав ли я. У меня есть пара конкретных вопросов, концептуально:

  1. Допустим, я создал 100 разделов, которые идеально вписываются в память (поэтому в моем случае каждый раздел будет иметь размер 100 МБ). Допустим, я хэшировал первые x элементов в моем файле json на один раздел и обнаружил дубликаты no . Допустим, у меня есть другой раздел со вторыми 100 МБ данных, который также не содержит дубликатов. Если я могу загружать только 100 МБ данных за один раз, как я могу проверить, что раздел 1 и раздел 2 не имеют дубликатов друг от друга? Чтобы уточнить, если у раздела 1 есть элемент, а у раздела 2 одинаковый элемент, как мне это выяснить? Я полагаю, мне нужно загрузить оба в память, верно? И если я не могу ... тогда что мне делать? Может быть, я неправильно понимаю ...

  2. Это подводит меня ко второму вопросу - похоже, что это не то, как работает разбиение, и когда вы хэшируете по разделам, элементы с аналогичным хеш-диапазоном или диапазоном хэширования попадают в определенный файл. Поэтому, если два элемента являются дубликатами, я бы знал, потому что алгоритм попытается поместить его в файл, где хеш уже существует. Это тот случай?

Я знаю, у меня есть еще вопросы, я просто не могу думать о них. У кого-нибудь есть советы? Особенно в отношении Pyspark и как использовать это лучше всего? Или pyspark - это не то, что я ищу?

1 Ответ

2 голосов
/ 01 мая 2019

Проблема проще, чем вы думаете.Вам действительно нужно агрегировать данные только по name, как предполагает @Hitobat.Я бы решил проблему с pyspark.sql.Window, чтобы упростить вывод агрегации.

Учитывая следующие данные, это файл с именем data.json (это также может быть каталог файлов в отличие от одного файла)

Содержимое data.json

{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}

Тогда код pyspark будет выглядеть так:

from pyspark.sql import Window
from pyspark.sql import functions as F

df = spark.read.json("data.json") # can be a directory of files as well 
df.show()

Вывод

+----------+----------+---------------+--------+----------------+
|   created|created_at|           name|    type|        username|
+----------+----------+---------------+--------+----------------+
|2015-08-04|2010-03-15|           null|    null|koleslawrulez333|
|2016-01-19|2012-05-25|  arthurking231|    null|            null|
|2016-07-23|2011-08-27|  starklord1943|Username|            null|
|2015-11-08|2010-01-19|Assasinator5827|    null|            null|
|2016-07-23|2011-08-27|Assasinator5827|Username|            null|
+----------+----------+---------------+--------+----------------+ 

Затем раздели подсчитайте с помощью pyspark.sql.Window

name_partition_window = Window.partitionBy("name")
df_with_repeat_counts = df.select("*", F.count("*").over(name_partition_window).alias("name_counts"))
df_with_repeat_counts.show()

Вывод

+----------+----------+---------------+--------+----------------+-----------+
|   created|created_at|           name|    type|        username|name_counts|
+----------+----------+---------------+--------+----------------+-----------+
|2016-01-19|2012-05-25|  arthurking231|    null|            null|          1|
|2015-08-04|2010-03-15|           null|    null|koleslawrulez333|          1|
|2015-11-08|2010-01-19|Assasinator5827|    null|            null|          2|
|2016-07-23|2011-08-27|Assasinator5827|Username|            null|          2|
|2016-07-23|2011-08-27|  starklord1943|Username|            null|          1|
+----------+----------+---------------+--------+----------------+-----------+

Затем отфильтруйте кадр данных в столбце name_count и упорядочите по имени для проверки

duplicates = df_with_repeat_counts.where(F.col("name_counts") > 1).orderBy("name")
duplicates.show()

Выходные данные

+----------+----------+---------------+--------+--------+-----------+
|   created|created_at|           name|    type|username|name_counts|
+----------+----------+---------------+--------+--------+-----------+
|2015-11-08|2010-01-19|Assasinator5827|    null|    null|          2|
|2016-07-23|2011-08-27|Assasinator5827|Username|    null|          2|
+----------+----------+---------------+--------+--------+-----------+

На этом этапе вы можете анализировать duplicates фрейм данных, необходимый для вашего варианта использования.

...