как определить отношения людей на основе имени, адреса и затем назначить один и тот же идентификатор через linux comman или Pyspark - PullRequest
0 голосов
/ 09 октября 2019

У меня есть один CSV-файл.

D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address
2,66M,J,Rock,F,1995,201211.0,J
3,David,HM,Lee,M,1991,201211.0,J
6,66M,,Rock,F,1990,201211.0,J
0,David,H M,Lee,M,1990,201211.0,B
3,Marc,H,Robert,M,2000,201211.0,C
6,Marc,M,Robert,M,1988,201211.0,C
6,Marc,MS,Robert,M,2000,201211.0,D

Я хочу назначить лицам с одинаковой фамилией, проживающим по тому же адресу, один и тот же идентификатор или индекс. Лучше, чтобы идентификатор состоял только из цифр.
Если у людей разные фамилии в одном и том же месте, то идентификатор должен быть другим. Такой идентификатор должен быть уникальным. А именно, люди, которые отличаются либо адресом, либо фамилией, ID должны быть разными. Мой ожидаемый вывод -

D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address,ID
2,66M,J,Rock,F,1995,201211.0,J,11
3,David,HM,Lee,M,1991,201211.0,J,12
6,66M,,Rock,F,1990,201211.0,J,11
0,David,H M,Lee,M,1990,201211.0,B,13
3,Marc,H,Robert,M,2000,201211.0,C,14
6,Marc,M,Robert,M,1988,201211.0,C,14
6,Marc,MS,Robert,M,2000,201211.0,D,15

Мой размер файла данных составляет около 30 ГБ. Я имею в виду использование функции groupBy в искре на основе ключа, состоящего из LNAME и address, для группировки этих наблюдений вместе. Затем присвойте ему идентификатор по ключу. Но я не знаю, как это сделать. После этого, возможно, я смогу использовать flatMap, чтобы разделить строку и вернуть эти наблюдения с идентификатором. Но я не уверен в этом. Кроме того, могу ли я сделать это в среде Linux? Спасибо.

Ответы [ 3 ]

2 голосов
/ 27 октября 2019

Как обсуждалось в комментариях, основная идея состоит в том, чтобы правильно разделить данные так, чтобы записи с одинаковым LNAME + адресом оставались в том же разделе, запустите код Python для генерации отдельного idx на каждом разделе, а затем объединить их в окончательный id .

Примечание: Я добавил несколько новых строк в ваши примеры записей, см. Результат df_new.show(), показанный ниже.

from pyspark.sql import Window, Row
from pyspark.sql.functions import coalesce, sum as fsum, col, max as fmax, lit, broadcast

# ...skip code to initialize the dataframe 

# tweak the number of repartitioning N based on actual data size
N = 5

# Python function to iterate through the sorted list of elements in the same 
# partition and assign an in-partition idx based on Address and LNAME.
def func(partition_id, it):
  idx, lname, address = (1, None, None)
  for row in sorted(it, key=lambda x: (x.LNAME, x.Address)):
    if lname and (row.LNAME != lname or row.Address != address): idx += 1
    yield Row(partition_id=partition_id, idx=idx, **row.asDict())
    lname = row.LNAME
    address = row.Address

# Repartition based on 'LNAME' and 'Address' and then run mapPartitionsWithIndex()
# function to create in-partition idx. Adjust N so that records in each partition
# should be small enough to be loaded into the executor memory:
df1 = df.repartition(N, 'LNAME', 'Address') \
        .rdd.mapPartitionsWithIndex(func) \
        .toDF()

Получить количество уникальных строк cnt (на основе Address + LNAME), равное max_idx, а затем получить текущую сумму этого rcnt.

# idx: calculated in-partition id
# cnt: number of unique ids in the same partition: fmax('idx')
# rcnt: starting_id for a partition(something like a running count): coalesce(fsum('cnt').over(w1),lit(0))
# w1: WindowSpec to calculate the above rcnt
w1 = Window.partitionBy().orderBy('partition_id').rowsBetween(Window.unboundedPreceding,-1)

df2 = df1.groupby('partition_id') \
         .agg(fmax('idx').alias('cnt')) \
         .withColumn('rcnt', coalesce(fsum('cnt').over(w1),lit(0)))

df2.show()
+------------+---+----+
|partition_id|cnt|rcnt|
+------------+---+----+
|           0|  3|   0|
|           1|  1|   3|
|           2|  1|   4|
|           4|  1|   5|
+------------+---+----+

Присоединиться к df1с помощью df2 и создайте окончательный идентификатор, который будет idx + rcnt

df_new = df1.join(broadcast(df2), on=['partition_id']).withColumn('id', col('idx')+col('rcnt'))

df_new.show()
#+------------+-------+---+----+-----+------+------+-----+---+--------+---+----+---+
#|partition_id|Address|  D| DOB|FNAME|GENDER| LNAME|MNAME|idx|snapshot|cnt|rcnt| id|
#+------------+-------+---+----+-----+------+------+-----+---+--------+---+----+---+
#|           0|      B|  0|1990|David|     M|   Lee|  H M|  1|201211.0|  3|   0|  1|
#|           0|      J|  3|1991|David|     M|   Lee|   HM|  2|201211.0|  3|   0|  2|
#|           0|      D|  6|2000| Marc|     M|Robert|   MS|  3|201211.0|  3|   0|  3|
#|           1|      C|  3|2000| Marc|     M|Robert|    H|  1|201211.0|  1|   3|  4|
#|           1|      C|  6|1988| Marc|     M|Robert|    M|  1|201211.0|  1|   3|  4|
#|           2|      J|  6|1991|  66M|     F|   Rek| null|  1|201211.0|  1|   4|  5|
#|           2|      J|  6|1992|  66M|     F|   Rek| null|  1|201211.0|  1|   4|  5|
#|           4|      J|  2|1995|  66M|     F|  Rock|    J|  1|201211.0|  1|   5|  6|
#|           4|      J|  6|1990|  66M|     F|  Rock| null|  1|201211.0|  1|   5|  6|
#|           4|      J|  6|1990|  66M|     F|  Rock| null|  1|201211.0|  1|   5|  6|
#+------------+-------+---+----+-----+------+------+-----+---+--------+---+----+---+

df_new = df_new.drop('partition_id', 'idx', 'rcnt', 'cnt')

Некоторые примечания:

  • Практически вам потребуется очистить / нормализовать столбец LNAME и Address перед их использованием в качестве проверки уникальности. Например, используйте отдельный столбец uniq_key, который объединяет LNAME и Address в качестве уникального ключа кадра данных. см. ниже пример с некоторыми основными процедурами очистки данных:

    from pyspark.sql.functions import coalesce, lit, concat_ws, upper, regexp_replace, trim
    
    #(1) convert NULL to '': coalesce(col, '')
    #(2) concatenate LNAME and Address using NULL char '\x00' or '\0'
    #(3) convert to uppercase: upper(text)
    #(4) remove all non-[word/whitespace/NULL_char]: regexp_replace(text, r'[^\x00\w\s]', '')
    #(5) convert consecutive whitespaces to a SPACE: regexp_replace(text, r'\s+', ' ')
    #(6) trim leading/trailing spaces: trim(text)
    df = (df.withColumn('uniq_key',
        trim(
          regexp_replace(
            regexp_replace(
              upper(
                concat_ws('\0', coalesce('LNAME', lit('')), coalesce('Address', lit('')))
              ),
              r'[^\x00\s\w]+',
              ''
            ),
            r'\s+',
            ' '
          )
        )
    ))
    

    Затем в коде замените 'LNAME' и 'Address' на uniq_key, чтобы найти idx

  • Как указано cronoik в комментарии, вы также можете попробовать одну из функций ранга окна, чтобы вычислить idx внутри раздела. например:

    from pyspark.sql.functions import spark_partition_id, dense_rank
    
    # use dense_rank to calculate the in-partition idx
    w2 = Window.partitionBy('partition_id').orderBy('LNAME', 'Address')
    df1 = df.repartition(N, 'LNAME', 'Address') \
            .withColumn('partition_id', spark_partition_id()) \
            .withColumn('idx', dense_rank().over(w2))
    

    После того, как у вас есть df1, используйте те же методы, что и выше, чтобы вычислить df2 и df_new. Это должно быть быстрее, чем при использовании mapPartitionsWithIndex(), который в основном основан на методе RDD.

  • Для ваших реальных данных настройте N в соответствии с вашим реальным размером данных. это N влияет только на начальные разделы, после соединения с фреймом данных будет восстановлено значение по умолчанию (200). Вы можете отрегулировать это с помощью spark.sql.shuffle.partitions, например, при инициализации сеанса зажигания:

    spark = SparkSession.builder \
                        ....
                        .config("spark.sql.shuffle.partitions", 500) \
                        .getOrCreate()
    
2 голосов
/ 09 октября 2019

Поскольку у вас есть 30 ГБ входных данных, вам, вероятно, не нужно что-то, что будет пытаться хранить все это в структурах данных в памяти. Вместо этого давайте используем дисковое пространство.

Вот один из подходов, который загружает все ваши данные в базу данных sqlite и генерирует идентификатор для каждой уникальной пары фамилии и адреса, а затем объединяет все вместе:

#!/bin/sh

csv="$1"
# Use an on-disk database instead of in-memory because source data is 30gb.
# This will take a while to run.
db=$(mktemp -p .)

sqlite3 -batch -csv -header "${db}" <<EOF
.import "${csv}" people
CREATE TABLE ids(id INTEGER PRIMARY KEY, lname, address, UNIQUE(lname, address));
INSERT OR IGNORE INTO ids(lname, address) SELECT lname, address FROM people;
SELECT p.*, i.id AS ID
FROM people AS p
JOIN ids AS i ON (p.lname, p.address) = (i.lname, i.address)
ORDER BY p.rowid;
EOF

rm -f "${db}"

Пример:

$./makeids.sh data.csv
D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address,ID
2,66M,J,Rock,F,1995,201211.0,J,1
3,David,HM,Lee,M,1991,201211.0,J,2
6,66M,"",Rock,F,1990,201211.0,J,1
0,David,"H M",Lee,M,1990,201211.0,B,3
3,Marc,H,Robert,M,2000,201211.0,C,4
6,Marc,M,Robert,M,1988,201211.0,C,4
6,Marc,MS,Robert,M,2000,201211.0,D,5

Лучше, если идентификатор состоит только из чисел.

Если это ограничение можно ослабить, выможет сделать это за один проход, используя криптографический хэш фамилии и адреса в качестве идентификатора:

$ perl -MDigest::SHA=sha1_hex -F, -lane '
   BEGIN { $" = $, = "," } 
   if ($. == 1) { print @F, "ID" }
   else { print @F, sha1_hex("@F[3,7]") }' data.csv
D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address,ID
2,66M,J,Rock,F,1995,201211.0,J,5c99211a841bd2b4c9cdcf72d7e95e46b2ae08b5
3,David,HM,Lee,M,1991,201211.0,J,c263f9d1feb4dc789de17a8aab8f2808aea2876a
6,66M,,Rock,F,1990,201211.0,J,5c99211a841bd2b4c9cdcf72d7e95e46b2ae08b5
0,David,H M,Lee,M,1990,201211.0,B,e86e81ab2715a8202e41b92ad979ca3a67743421
3,Marc,H,Robert,M,2000,201211.0,C,363ed8175fdf441ed59ac19cea3c37b6ce9df152
6,Marc,M,Robert,M,1988,201211.0,C,363ed8175fdf441ed59ac19cea3c37b6ce9df152
6,Marc,MS,Robert,M,2000,201211.0,D,cf5135dc402efe16cd170191b03b690d58ea5189

Или, если число уникальных пар lname, адресов достаточно мало, чтобы они моглиразумно храниться в хэш-таблице вашей системы:

#!/usr/bin/gawk -f
BEGIN {
    FS = OFS = ","
}
NR == 1 {
    print $0, "ID"
    next
}
! ($4, $8) in ids {
    ids[$4, $8] = ++counter
}
{
    print $0, ids[$4, $8]
}
0 голосов
/ 09 октября 2019
$ sort -t, -k8,8 -k4,4 <<EOD | awk -F, '  $8","$4 != last { ++id; last = $8","$4 }
                                          { NR!=1 && $9=id; print }' id=9 OFS=,
D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address
2,66M,J,Rock,F,1995,201211.0,J
3,David,HM,Lee,M,1991,201211.0,J
6,66M,,Rock,F,1990,201211.0,J
0,David,H M,Lee,M,1990,201211.0,B
3,Marc,H,Robert,M,2000,201211.0,C
6,Marc,M,Robert,M,1988,201211.0,C
6,Marc,MS,Robert,M,2000,201211.0,D
> EOD
D,FNAME,MNAME,LNAME,GENDER,DOB,snapshot,Address
0,David,H M,Lee,M,1990,201211.0,B,11
3,Marc,H,Robert,M,2000,201211.0,C,12
6,Marc,M,Robert,M,1988,201211.0,C,12
6,Marc,MS,Robert,M,2000,201211.0,D,13
3,David,HM,Lee,M,1991,201211.0,J,14
2,66M,J,Rock,F,1995,201211.0,J,15
6,66M,,Rock,F,1990,201211.0,J,15
$
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...