Эффективное определение суффикса строки - PullRequest
0 голосов
/ 01 февраля 2019

Я работаю с PySpark над огромным набором данных, где я хочу отфильтровать фрейм данных на основе строк в другом фрейме данных.Например,

dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).toDF('domains')
+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com                    |
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy                   |
|something.good.com.cy.mal.org           |
+----------------------------------------+  

dd1 =  spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
+-----------+
|gooddomains|
+-----------+
|google.com |
|good.com.cy|
+-----------+

Я предполагаю, что domains и gooddomains являются допустимыми доменными именами.

Я хочу отфильтровать совпадающие строки в dd, которые делаютне заканчивается dd1.Так что в приведенном выше примере я хочу отфильтровать строку 1 и строку 3, чтобы в итоге получить

+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
+----------------------------------------+  

Мое текущее решение (как показано ниже) может учитывать только домены до 3 «слов».Если я добавлю, скажем, verygood.co.ac.uk в dd1 (то есть в белый список), то произойдет сбой.

def split_filter(x, whitelist):
    splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
    last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
    last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
       F.lit('.'), \
       splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
    x = x.withColumn('id', F.monotonically_increasing_id())
    last_two = last_two.withColumn('id', F.monotonically_increasing_id())
    last_three = last_three.withColumn('id', F.monotonically_increasing_id())
    final_d = x.join(last_two, ['id']).join(last_three, ['id'])
    df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
    df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
    return df2.drop('id')

Я использую Spark 2.3.0 с Python 2.7.5.

Ответы [ 2 ]

0 голосов
/ 05 февраля 2019

Давайте расширим domains для чуть лучшего охвата:

domains = spark.createDataFrame([
    "something.google.com",  # OK
    "something.google.com.somethingelse.ac.uk", # NOT OK 
    "something.good.com.cy", # OK 
    "something.good.com.cy.mal.org",  # NOT OK
    "something.bad.com.cy",  # NOT OK
    "omgalsogood.com.cy", # NOT OK
    "good.com.cy",   # OK 
    "sogood.example.com",  # OK Match for shorter redundant, mismatch on longer
    "notsoreal.googleecom" # NOT OK
], "string").toDF('domains')

good_domains =  spark.createDataFrame([
    "google.com", "good.com.cy", "alsogood.com.cy",
    "good.example.com", "example.com"  # Redundant case
], "string").toDF('gooddomains')

Сейчас ... Наивное решение, использующее только примитивы Spark SQL , состоит в том, чтобы упростить ваш текущий подход.немного.Поскольку вы заявили, что можно предположить, что это допустимые публичные домены, мы можем определить функцию, подобную этой:

from pyspark.sql.functions import col, regexp_extract

def suffix(c): 
    return regexp_extract(c, "([^.]+\\.[^.]+$)", 1) 

, которая извлекает домен верхнего уровня и поддомен первого уровня:

domains_with_suffix = (domains
    .withColumn("suffix", suffix("domains"))
    .alias("domains"))
good_domains_with_suffix = (good_domains
    .withColumn("suffix", suffix("gooddomains"))
    .alias("good_domains"))

domains_with_suffix.show()
+--------------------+--------------------+
|             domains|              suffix|
+--------------------+--------------------+
|something.google.com|          google.com|
|something.google....|               ac.uk|
|something.good.co...|              com.cy|
|something.good.co...|             mal.org|
|something.bad.com.cy|              com.cy|
|  omgalsogood.com.cy|              com.cy|
|         good.com.cy|              com.cy|
|  sogood.example.com|         example.com|
|notsoreal.googleecom|notsoreal.googleecom|
+--------------------+--------------------+

Теперь мы можем выполнить внешнее соединение:

from pyspark.sql.functions import (
    col, concat, lit, monotonically_increasing_id, sum as sum_
)

candidates = (domains_with_suffix
    .join(
        good_domains_with_suffix,
        col("domains.suffix") == col("good_domains.suffix"), 
        "left"))

и отфильтровать результат:

is_good_expr = (
    col("good_domains.suffix").isNotNull() &      # Match on suffix
    (

        # Exact match
        (col("domains") == col("gooddomains")) |
        # Subdomain match
        col("domains").endswith(concat(lit("."), col("gooddomains")))
    )
)

not_good_domains = (candidates
    .groupBy("domains")  # .groupBy("suffix", "domains") - see the discussion
    .agg((sum_(is_good_expr.cast("integer")) > 0).alias("any_good"))
    .filter(~col("any_good"))
    .drop("any_good"))

not_good_domains.show(truncate=False)     
+----------------------------------------+
|domains                                 |
+----------------------------------------+
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
|something.good.com.cy.mal.org           |
|something.google.com.somethingelse.ac.uk|
|something.bad.com.cy                    |
+----------------------------------------+

Это лучше, чем Требуется декартово произведениедля прямого соединения с LIKE, но неудовлетворительно для грубой силы и в худшем случае требуется два шаффла - одно для join (это можно пропустить, если good_domains достаточно мало, чтобы broadcasted)и еще один для group_by + agg.

К сожалению, Spark SQL не позволяет настраиваемому разделителю использовать только один случайный порядок для обоих (однако это возможно с составным ключом в RDD API), и оптимизатор еще недостаточно умен, чтобы оптимизировать join(_, "key1") и .groupBy("key1", _).

Если вы можете принять некоторые ложные отрицания, вы можете перейти на вероятностный .Сначала давайте построим вероятностный счетчик (здесь используя bounter с небольшой подсказкой от toolz)

from pyspark.sql.functions import concat_ws, reverse, split
from bounter import bounter
from toolz.curried import identity, partition_all

# This is only for testing on toy examples, in practice use more realistic value
size_mb = 20      
chunk_size = 100

def reverse_domain(c):
    return concat_ws(".", reverse(split(c, "\\.")))

def merge(acc, xs):
    acc.update(xs)
    return acc

counter = sc.broadcast((good_domains
    .select(reverse_domain("gooddomains"))
    .rdd.flatMap(identity)
    # Chunk data into groups so we reduce the number of update calls
    .mapPartitions(partition_all(chunk_size))
    # Use tree aggregate to reduce pressure on the driver, 
    # when number of partitions is large*
    # You can use depth parameter for further tuning
    .treeAggregate(bounter(need_iteration=False, size_mb=size_mb), merge, merge)))

, затем определим пользовательскую функцию, подобную этой

from pyspark.sql.functions import pandas_udf, PandasUDFType
from toolz import accumulate

def is_good_counter(counter):
    def is_good_(x):
        return any(
            x in counter.value 
            for x in accumulate(lambda x, y: "{}.{}".format(x, y), x.split("."))
        )

    @pandas_udf("boolean", PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(is_good_)
    return _

и фильтрация domains:

domains.filter(
    ~is_good_counter(counter)(reverse_domain("domains"))
).show(truncate=False)
+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

В Scala это можно сделать с помощью bloomFilter

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.util.sketch.BloomFilter

def reverseDomain(c: Column) = concat_ws(".", reverse(split(c, "\\.")))

val checker = good_domains.stat.bloomFilter(
  // Adjust values depending on the data
  reverseDomain($"gooddomains"), 1000, 0.001 
)

def isGood(checker: BloomFilter) = udf((s: String) => 
  s.split('.').toStream.scanLeft("") {
    case ("", x) => x
    case (acc, x) => s"${acc}.${x}"
}.tail.exists(checker mightContain _))


domains.filter(!isGood(checker)(reverseDomain($"domains"))).show(false)
+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

и, если необходимо, не составит труда вызвать такой код из Python .

Это может быть еще не вполне удовлетворительным из-за приблизительной природы.Если вам требуется точный результат, вы можете попробовать использовать избыточный характер данных , например, с trie (здесь используется реализация datrie).

Если good_domains относительно малы, вы можете создать одну модель, аналогично вероятностному варианту:

import string
import datrie


def seq_op(acc, x):
    acc[x] = True
    return acc

def comb_op(acc1, acc2):
    acc1.update(acc2)
    return acc1

trie = sc.broadcast((good_domains
    .select(reverse_domain("gooddomains"))
    .rdd.flatMap(identity)
    # string.printable is a bit excessive if you need standard domain
    # and not enough if you allow internationalized domain names.
    # In the latter case you'll have to adjust the `alphabet`
    # or use different implementation of trie.
    .treeAggregate(datrie.Trie(string.printable), seq_op, comb_op)))

определить пользовательскую функцию:

def is_good_trie(trie):
    def is_good_(x):
        if not x:
            return False
        else:
            return any(
                x == match or x[len(match)] == "."
                for match in trie.value.iter_prefixes(x)
            )

    @pandas_udf("boolean", PandasUDFType.SCALAR)
    def _(xs):
        return xs.apply(is_good_)

    return _

и примените его к данным:

domains.filter(
    ~is_good_trie(trie)(reverse_domain("domains"))
).show(truncate=False)
+----------------------------------------+
|domains                                 |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org           |
|something.bad.com.cy                    |
|omgalsogood.com.cy                      |
|notsoreal.googleecom                    |
+----------------------------------------+

Этот конкретный подход работает при условии, что все good_domains могут быть сжаты в один файл, но могут быть легко расширены для обработки случаев, когда это предположениене устраивает.Например, вы можете создать один trie для домена верхнего уровня или суффикса (как определено в простом решении)

(good_domains
    .select(suffix("gooddomains"), reverse_domain("gooddomains"))
    .rdd
    .aggregateByKey(datrie.Trie(string.printable), seq_op, comb_op))

, а затем либо загружать модели по запросу из сериализованной версии, либо использовать операции RDD.

Два нестандартных метода могут быть дополнительно откорректированы в зависимости от данных, бизнес-требований (например, ложноотрицательный допуск в случае приближенного решения) и доступных ресурсов (память драйвера, память исполнителя, количество элементов suffixes,доступ к распределенной POSIX-совместимой распределенной файловой системе и т. д.).Есть также некоторые компромиссы, которые следует учитывать при выборе между их применением на DataFrames и RDDs (использование памяти, издержки на связь и сериализацию).


* См. Понимание treeReduce () вСпарк

0 голосов
/ 04 февраля 2019

Если я правильно понимаю, вам просто нужно левое антисоединение с использованием простого шаблона сопоставления строк SQL.

from pyspark.sql.functions import expr

dd.alias("l")\
    .join(
        dd1.alias("r"), 
        on=expr("l.domains LIKE concat('%', r.gooddomains)"), 
        how="leftanti"
    )\
    .select("l.*")\
    .show(truncate=False)
#+----------------------------------------+
#|domains                                 |
#+----------------------------------------+
#|something.google.com.somethingelse.ac.uk|
#|something.good.com.cy.mal.org           |
#+----------------------------------------+

Выражение concat('%', r.gooddomains) добавляет подстановочный знак к r.gooddomains.

Далее мы используем l.domains LIKE concat('%', r.gooddomains), чтобы найти строки, которые соответствуют этому шаблону.

Наконец, укажите how="leftanti", чтобы сохранить только те строки, которые не совпадают.


Обновление : Как указано в ,комментарии от @ user10938362 у этого подхода есть два недостатка:

1) Так как при этом рассматриваются только соответствующие суффиксы, существуют крайние случаи, когда это приводит к неверным результатам.Например:

example.com должно соответствовать example.com и subdomain.example.com, но не fakeexample.com

Есть два способа приблизиться к этому.Во-первых, нужно изменить выражение LIKE, чтобы справиться с этим.Поскольку мы знаем, что это все действительные домены, мы можем проверить точное совпадение или точку, за которой следует домен:

like_expr = " OR ".join(
    [
        "(l.domains = r.gooddomains)",
        "(l.domains LIKE concat('%.', r.gooddomains))"
    ]
)

dd.alias("l")\
    .join(
        dd1.alias("r"), 
        on=expr(like_expr), 
        how="leftanti"
    )\
    .select("l.*")\
    .show(truncate=False)

Аналогично, можно использовать RLIKE с шаблоном регулярного выражения с внешним видом.позади.

2) Большая проблема в том, что, как объяснено здесь , соединение с выражением LIKE вызовет декартово произведение.Если dd1 достаточно мало для вещания, то это не проблема.

В противном случае вы можете столкнуться с проблемами производительности, и вам придется попробовать другой подход.


Подробнее об операторе PySparkSQL LIKE из Apache HIVE docs :

A LIKE B:

TRUE, если строка A соответствует простому регулярному выражению SQL B, в противном случае - FALSE.Сравнение делается посимвольно.Символ _ в B соответствует любому символу в A (аналогично . в регулярных выражениях posix), а символ % в B соответствует произвольному количеству символов в A (аналогично .* в регулярных выражениях posix)).Например, 'foobar' LIKE 'foo' оценивается как ЛОЖЬ, где 'foobar' LIKE 'foo___' оценивается как ИСТИНА, как и 'foobar' LIKE 'foo%'.Для экранирования % используйте \ (% соответствует одному % символу).Если данные содержат точку с запятой, и вы хотите найти ее, ее необходимо экранировать, columnValue LIKE 'a\;b'


Примечание : здесь используется «хитрость»использования от pyspark.sql.functions.expr до передачи значения столбца в качестве параметра функции .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...