Выберите k случайных элементов из списка, чьи элементы имеют веса - PullRequest
67 голосов
/ 26 января 2010

Выбор без каких-либо весов (равных вероятностей) прекрасно описан здесь .

Мне было интересно, есть ли способ преобразовать этот подход во взвешенный.

Меня также интересуют и другие подходы.

Обновление: выборка без замена

Ответы [ 14 ]

66 голосов
/ 27 января 2010

Если выборка с заменой, , вы можете использовать этот алгоритм (реализованный здесь в Python):

import random

items = [(10, "low"),
         (100, "mid"),
         (890, "large")]

def weighted_sample(items, n):
    total = float(sum(w for w, v in items))
    i = 0
    w, v = items[0]
    while n:
        x = total * (1 - random.random() ** (1.0 / n))
        total -= x
        while x > w:
            x -= w
            i += 1
            w, v = items[i]
        w -= x
        yield v
        n -= 1

Это O ( n + m ), где m - количество элементов.

Почему это работает? Он основан на следующем алгоритме:

def n_random_numbers_decreasing(v, n):
    """Like reversed(sorted(v * random() for i in range(n))),
    but faster because we avoid sorting."""
    while n:
        v *= random.random() ** (1.0 / n)
        yield v
        n -= 1

Функция weighted_sample является именно этим алгоритмом, объединенным с обходом списка items, чтобы выбрать элементы, выбранные этими случайными числами.

Это, в свою очередь, работает, потому что вероятность того, что n случайных чисел 0 .. v окажется меньше z , равна P = ( z / v ) n . Решите для z , и вы получите z = vP 1 / n . Подстановка случайного числа для P выбирает наибольшее число с правильным распределением; и мы можем просто повторить процесс, чтобы выбрать все остальные числа.

Если выборка без замены, вы можете поместить все элементы в двоичную кучу, где каждый узел кэширует сумму весов всех элементов в этой подпапке. Сборка кучи - это O ( m ). Выбор случайного элемента из кучи с учетом веса - O (log m ). Удаление этого элемента и обновление кэшированных итогов также является O (log m ). Таким образом, вы можете выбрать n элементов в O ( m + n log m ) времени.

(Примечание: здесь «вес» означает, что каждый раз, когда выбирается элемент, остальные возможности выбираются с вероятностью, пропорциональной их весам. Это не означает, что элементы появляются в выходных данных с вероятностью, пропорциональной их весам.)

Вот реализация этого, обильно прокомментированная:

import random

class Node:
    # Each node in the heap has a weight, value, and total weight.
    # The total weight, self.tw, is self.w plus the weight of any children.
    __slots__ = ['w', 'v', 'tw']
    def __init__(self, w, v, tw):
        self.w, self.v, self.tw = w, v, tw

def rws_heap(items):
    # h is the heap. It's like a binary tree that lives in an array.
    # It has a Node for each pair in `items`. h[1] is the root. Each
    # other Node h[i] has a parent at h[i>>1]. Each node has up to 2
    # children, h[i<<1] and h[(i<<1)+1].  To get this nice simple
    # arithmetic, we have to leave h[0] vacant.
    h = [None]                          # leave h[0] vacant
    for w, v in items:
        h.append(Node(w, v, w))
    for i in range(len(h) - 1, 1, -1):  # total up the tws
        h[i>>1].tw += h[i].tw           # add h[i]'s total to its parent
    return h

def rws_heap_pop(h):
    gas = h[1].tw * random.random()     # start with a random amount of gas

    i = 1                     # start driving at the root
    while gas >= h[i].w:      # while we have enough gas to get past node i:
        gas -= h[i].w         #   drive past node i
        i <<= 1               #   move to first child
        if gas >= h[i].tw:    #   if we have enough gas:
            gas -= h[i].tw    #     drive past first child and descendants
            i += 1            #     move to second child
    w = h[i].w                # out of gas! h[i] is the selected node.
    v = h[i].v

    h[i].w = 0                # make sure this node isn't chosen again
    while i:                  # fix up total weights
        h[i].tw -= w
        i >>= 1
    return v

def random_weighted_sample_no_replacement(items, n):
    heap = rws_heap(items)              # just make a heap...
    for i in range(n):
        yield rws_heap_pop(heap)        # and pop n items off it.
41 голосов
/ 28 января 2010

Если выборка с заменой, используйте метод выбор колеса рулетки (часто используется в генетических алгоритмах):

  1. сортировка весов
  2. вычислить совокупные веса
  3. выберите случайное число в [0,1]*totalWeight
  4. найдите интервал, в который это число попадает в
  5. выбрать элементы с соответствующим интервалом
  6. повтор k раз

alt text

Если выборка без замены, вы можете адаптировать вышеупомянутую технику, удаляя выбранный элемент из списка после каждой итерации, а затем повторно нормализуя веса так, чтобы их сумма была 1 (действительная функция распределения вероятности)

24 голосов
/ 14 мая 2015

Я знаю, что это очень старый вопрос, но я думаю, что есть хороший прием, чтобы сделать это за O (n) время, если вы примените немного математики!

Экспоненциальное распределение имеет два очень полезных свойства.

  1. Учитывая n выборок из различных экспоненциальных распределений с различными параметрами скорости, вероятность того, что данная выборка является минимальной, равна ее параметру скорости, деленному на сумму всех параметров скорости.

  2. Это "без памяти". Таким образом, если вы уже знаете минимум, то вероятность того, что любой из оставшихся элементов будет равен 2-й минуте, равна вероятности того, что если бы истинные минимальные значения были удалены (и никогда не генерировались), этот элемент был бы новым минимум Это кажется очевидным, но я думаю, что из-за некоторых проблем с условной вероятностью это может быть не так для других распределений.

Используя факт 1, мы знаем, что выбор одного элемента может быть сделан путем генерирования этих выборок экспоненциального распределения с параметром скорости, равным весу, а затем выбора одного с минимальным значением.

Используя факт 2, мы знаем, что нам не нужно заново генерировать экспоненциальные выборки. Вместо этого просто сгенерируйте по одному для каждого элемента и возьмите k элементов с наименьшими выборками.

Найти наименьшее k можно в O (n). Используйте алгоритм Quickselect , чтобы найти k-й элемент, затем просто сделайте еще один проход через все элементы и выведите все значения ниже k-го.

Полезное примечание: если у вас нет немедленного доступа к библиотеке для генерации образцов экспоненциального распределения, это можно легко сделать: -ln(rand())/weight

3 голосов
/ 01 июня 2012

Я сделал это в Ruby

https://github.com/fl00r/pickup

require 'pickup'
pond = {
  "selmon"  => 1,
  "carp" => 4,
  "crucian"  => 3,
  "herring" => 6,
  "sturgeon" => 8,
  "gudgeon" => 10,
  "minnow" => 20
}
pickup = Pickup.new(pond, uniq: true)
pickup.pick(3)
#=> [ "gudgeon", "herring", "minnow" ]
pickup.pick
#=> "herring"
pickup.pick
#=> "gudgeon"
pickup.pick
#=> "sturgeon"
1 голос
/ 21 ноября 2014

Если вы хотите выбрать x элементов из взвешенного набора без замены, чтобы элементы выбирались с вероятностью, пропорциональной их весам:

import random

def weighted_choose_subset(weighted_set, count):
    """Return a random sample of count elements from a weighted set.

    weighted_set should be a sequence of tuples of the form 
    (item, weight), for example:  [('a', 1), ('b', 2), ('c', 3)]

    Each element from weighted_set shows up at most once in the
    result, and the relative likelihood of two particular elements
    showing up is equal to the ratio of their weights.

    This works as follows:

    1.) Line up the items along the number line from [0, the sum
    of all weights) such that each item occupies a segment of
    length equal to its weight.

    2.) Randomly pick a number "start" in the range [0, total
    weight / count).

    3.) Find all the points "start + n/count" (for all integers n
    such that the point is within our segments) and yield the set
    containing the items marked by those points.

    Note that this implementation may not return each possible
    subset.  For example, with the input ([('a': 1), ('b': 1),
    ('c': 1), ('d': 1)], 2), it may only produce the sets ['a',
    'c'] and ['b', 'd'], but it will do so such that the weights
    are respected.

    This implementation only works for nonnegative integral
    weights.  The highest weight in the input set must be less
    than the total weight divided by the count; otherwise it would
    be impossible to respect the weights while never returning
    that element more than once per invocation.
    """
    if count == 0:
        return []

    total_weight = 0
    max_weight = 0
    borders = []
    for item, weight in weighted_set:
        if weight < 0:
            raise RuntimeError("All weights must be positive integers")
        # Scale up weights so dividing total_weight / count doesn't truncate:
        weight *= count
        total_weight += weight
        borders.append(total_weight)
        max_weight = max(max_weight, weight)

    step = int(total_weight / count)

    if max_weight > step:
        raise RuntimeError(
            "Each weight must be less than total weight / count")

    next_stop = random.randint(0, step - 1)

    results = []
    current = 0
    for i in range(count):
        while borders[current] <= next_stop:
            current += 1
        results.append(weighted_set[current][0])
        next_stop += step

    return results
1 голос
/ 26 августа 2012

Вот реализация Go из geodns :

package foo

import (
    "log"
    "math/rand"
)

type server struct {
    Weight int
    data   interface{}
}

func foo(servers []server) {
    // servers list is already sorted by the Weight attribute

    // number of items to pick
    max := 4

    result := make([]server, max)

    sum := 0
    for _, r := range servers {
        sum += r.Weight
    }

    for si := 0; si < max; si++ {
        n := rand.Intn(sum + 1)
        s := 0

        for i := range servers {
            s += int(servers[i].Weight)
            if s >= n {
                log.Println("Picked record", i, servers[i])
                sum -= servers[i].Weight
                result[si] = servers[i]

                // remove the server from the list
                servers = append(servers[:i], servers[i+1:]...)
                break
            }
        }
    }

    return result
}
1 голос
/ 10 мая 2011

Если вы хотите создать большие массивы случайных целых чисел с заменой , вы можете использовать кусочно-линейную интерполяцию. Например, используя NumPy / SciPy:

import numpy
import scipy.interpolate

def weighted_randint(weights, size=None):
    """Given an n-element vector of weights, randomly sample
    integers up to n with probabilities proportional to weights"""
    n = weights.size
    # normalize so that the weights sum to unity
    weights = weights / numpy.linalg.norm(weights, 1)
    # cumulative sum of weights
    cumulative_weights = weights.cumsum()
    # piecewise-linear interpolating function whose domain is
    # the unit interval and whose range is the integers up to n
    f = scipy.interpolate.interp1d(
            numpy.hstack((0.0, weights)),
            numpy.arange(n + 1), kind='linear')
    return f(numpy.random.random(size=size)).astype(int)

Это не эффективно, если вы хотите пробовать без замены.

0 голосов
/ 11 января 2019

Я просто потратил несколько часов, пытаясь отстать от алгоритмов, лежащих в основе выборки без замены , и эта тема более сложная, чем я первоначально думал. Это захватывающе! Для будущих читателей (хорошего дня!) Я документирую свои идеи здесь , включая готовую к использованию функцию, которая учитывает заданные вероятности включения далее ниже. Хороший и быстрый математический обзор различных методов можно найти здесь: Tillé: Алгоритмы выборки с равными или неравными вероятностями . Например, метод Джейсона можно найти на странице 46. Предостережение с его методом состоит в том, что веса не пропорциональны вероятностям включения, как также отмечено в документе. Фактически, i -ые вероятности включения могут быть рекурсивно вычислены следующим образом:

def inclusion_probability(i, weights, k):
    """
        Computes the inclusion probability of the i-th element
        in a randomly sampled k-tuple using Jason's algorithm
        (see https://stackoverflow.com/a/2149533/7729124)
    """
    if k <= 0: return 0
    cum_p = 0
    for j, weight in enumerate(weights):
        # compute the probability of j being selected considering the weights
        p = weight / sum(weights)

        if i == j:
            # if this is the target element, we don't have to go deeper,
            # since we know that i is included
            cum_p += p
        else:
            # if this is not the target element, than we compute the conditional
            # inclusion probability of i under the constraint that j is included
            cond_i = i if i < j else i-1
            cond_weights = weights[:j] + weights[j+1:]
            cond_p = inclusion_probability(cond_i, cond_weights, k-1)
            cum_p += p * cond_p
    return cum_p

И мы можем проверить правильность указанной выше функции, сравнив

In : for i in range(3): print(i, inclusion_probability(i, [1,2,3], 2))
0 0.41666666666666663
1 0.7333333333333333
2 0.85

до

In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: random_weighted_sample_no_replacement([(1,'a'),(2,'b'),(3,'c')],2))
Out: Counter({'a': 4198, 'b': 7268, 'c': 8534})

Один способ - также предложенный в документе выше - указать вероятности включения - это рассчитать весовые коэффициенты по ним. Вся сложность рассматриваемого вопроса проистекает из того факта, что никто не может сделать это напрямую, поскольку в основном нужно инвертировать формулу рекурсии, символически я утверждаю, что это невозможно. Численно это можно сделать, используя все виды методов, например, Метод Ньютона. Однако сложность обращения якобиана с использованием простого Python быстро становится невыносимой, я действительно рекомендую изучить numpy.random.choice в этом случае.

К счастью, есть метод, использующий простой Python, который может или не может быть достаточно производительным для ваших целей, он прекрасно работает, если не так много разных весов. Вы можете найти алгоритм на стр. 75 и 76. Он работает, разделяя процесс выборки на части с одинаковыми вероятностями включения, то есть мы можем снова использовать random.sample! Я не собираюсь объяснять принцип здесь, так как основы хорошо представлены на странице 69. Вот код с, надеюсь, достаточным количеством комментариев:

def sample_no_replacement_exact(items, k, best_effort=False, random_=None, ε=1e-9):
    """
        Returns a random sample of k elements from items, where items is a list of
        tuples (weight, element). The inclusion probability of an element in the
        final sample is given by
           k * weight / sum(weights).

        Note that the function raises if a inclusion probability cannot be
        satisfied, e.g the following call is obviously illegal:
           sample_no_replacement_exact([(1,'a'),(2,'b')],2)
        Since selecting two elements means selecting both all the time,
        'b' cannot be selected twice as often as 'a'. In general it can be hard to
        spot if the weights are illegal and the function does *not* always raise
        an exception in that case. To remedy the situation you can pass
        best_effort=True which redistributes the inclusion probability mass
        if necessary. Note that the inclusion probabilities will change
        if deemed necessary.

        The algorithm is based on the splitting procedure on page 75/76 in:
        http://www.eustat.eus/productosServicios/52.1_Unequal_prob_sampling.pdf
        Additional information can be found here:
        /1773959/vyberite-k-sluchainyh-elementov-iz-spiska-chi-elementy-imeyt-vesa

        :param items: list of tuples of type weight,element
        :param k: length of resulting sample
        :param best_effort: fix inclusion probabilities if necessary,
                            (optional, defaults to False)
        :param random_: random module to use (optional, defaults to the
                        standard random module)
        :param ε: fuzziness parameter when testing for zero in the context
                  of floating point arithmetic (optional, defaults to 1e-9)
        :return: random sample set of size k
        :exception: throws ValueError in case of bad parameters,
                    throws AssertionError in case of algorithmic impossibilities
    """
    # random_ defaults to the random submodule
    if not random_:
        random_ = random

    # special case empty return set
    if k <= 0:
        return set()

    if k > len(items):
        raise ValueError("resulting tuple length exceeds number of elements (k > n)")

    # sort items by weight
    items = sorted(items, key=lambda item: item[0])

    # extract the weights and elements
    weights, elements = list(zip(*items))

    # compute the inclusion probabilities (short: π) of the elements
    scaling_factor = k / sum(weights)
    π = [scaling_factor * weight for weight in weights]

    # in case of best_effort: if a inclusion probability exceeds 1,
    # try to rebalance the probabilities such that:
    # a) no probability exceeds 1,
    # b) the probabilities still sum to k, and
    # c) the probability masses flow from top to bottom:
    #    [0.2, 0.3, 1.5] -> [0.2, 0.8, 1]
    # (remember that π is sorted)
    if best_effort and π[-1] > 1 + ε:
        # probability mass we still we have to distribute
        debt = 0.
        for i in reversed(range(len(π))):
            if π[i] > 1.:
                # an 'offender', take away excess
                debt += π[i] - 1.
                π[i] = 1.
            else:
                # case π[i] < 1, i.e. 'save' element
                # maximum we can transfer from debt to π[i] and still not
                # exceed 1 is computed by the minimum of:
                # a) 1 - π[i], and
                # b) debt
                max_transfer = min(debt, 1. - π[i])
                debt -= max_transfer
                π[i] += max_transfer
        assert debt < ε, "best effort rebalancing failed (impossible)"

    # make sure we are talking about probabilities
    if any(not (0 - ε <= π_i <= 1 + ε) for π_i in π):
        raise ValueError("inclusion probabilities not satisfiable: {}" \
                         .format(list(zip(π, elements))))

    # special case equal probabilities
    # (up to fuzziness parameter, remember that π is sorted)
    if π[-1] < π[0] + ε:
        return set(random_.sample(elements, k))

    # compute the two possible lambda values, see formula 7 on page 75
    # (remember that π is sorted)
    λ1 = π[0] * len(π) / k
    λ2 = (1 - π[-1]) * len(π) / (len(π) - k)
    λ = min(λ1, λ2)

    # there are two cases now, see also page 69
    # CASE 1
    # with probability λ we are in the equal probability case
    # where all elements have the same inclusion probability
    if random_.random() < λ:
        return set(random_.sample(elements, k))

    # CASE 2:
    # with probability 1-λ we are in the case of a new sample without
    # replacement problem which is strictly simpler,
    # it has the following new probabilities (see page 75, π^{(2)}):
    new_π = [
        (π_i - λ * k / len(π))
        /
        (1 - λ)
        for π_i in π
    ]
    new_items = list(zip(new_π, elements))

    # the first few probabilities might be 0, remove them
    # NOTE: we make sure that floating point issues do not arise
    #       by using the fuzziness parameter
    while new_items and new_items[0][0] < ε:
        new_items = new_items[1:]

    # the last few probabilities might be 1, remove them and mark them as selected
    # NOTE: we make sure that floating point issues do not arise
    #       by using the fuzziness parameter
    selected_elements = set()
    while new_items and new_items[-1][0] > 1 - ε:
        selected_elements.add(new_items[-1][1])
        new_items = new_items[:-1]

    # the algorithm reduces the length of the sample problem,
    # it is guaranteed that:
    # if λ = λ1: the first item has probability 0
    # if λ = λ2: the last item has probability 1
    assert len(new_items) < len(items), "problem was not simplified (impossible)"

    # recursive call with the simpler sample problem
    # NOTE: we have to make sure that the selected elements are included
    return sample_no_replacement_exact(
        new_items,
        k - len(selected_elements),
        best_effort=best_effort,
        random_=random_,
        ε=ε
    ) | selected_elements

Пример: * * тысяча тридцать-один

In : sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c')],2)
Out: {'b', 'c'}

In : import collections, itertools
In : sample_tester = lambda f: collections.Counter(itertools.chain(*(f() for _ in range(10000))))
In : sample_tester(lambda: sample_no_replacement_exact([(1,'a'),(2,'b'),(3,'c'),(4,'d')],2))
Out: Counter({'a': 2048, 'b': 4051, 'c': 5979, 'd': 7922})

Весовые коэффициенты составляют до 10, следовательно, вероятности включения вычисляются следующим образом: a → 20%, b → 40%, c → 60%, d → 80%. (Сумма: 200% = к.) Это работает!

Всего лишь одно предостережение в отношении продуктивного использования этой функции, может быть очень трудно обнаружить незаконные входные данные для весов. Очевидный незаконный пример -

In: sample_no_replacement_exact([(1,'a'),(2,'b')],2)
ValueError: inclusion probabilities not satisfiable: [(0.6666666666666666, 'a'), (1.3333333333333333, 'b')]

b не может появляться в два раза чаще, чем a, так как оба должны быть всегда . Есть более тонкие примеры. Чтобы избежать исключения в производстве, просто используйте best_effort = True, который перебалансирует массу вероятности включения так, чтобы у нас было всегда действительного распределения. Очевидно, это может изменить вероятности включения.

0 голосов
/ 18 января 2018

Выборка без замены на рекурсию - элегантное и очень короткое решение в c #

// сколько способов мы можем выбрать 4 из 60 студентов, чтобы каждый раз мы выбирали разные 4

class Program
{
    static void Main(string[] args)
    {
        int group = 60;
        int studentsToChoose = 4;

        Console.WriteLine(FindNumberOfStudents(studentsToChoose, group));
    }

    private static int FindNumberOfStudents(int studentsToChoose, int group)
    {
        if (studentsToChoose == group || studentsToChoose == 0)
            return 1;

        return FindNumberOfStudents(studentsToChoose, group - 1) + FindNumberOfStudents(studentsToChoose - 1, group - 1);

    }
}
0 голосов
/ 15 мая 2016

Я реализовал алгоритм, аналогичный идее Джейсона Орендорфа в Rust здесь . Моя версия дополнительно поддерживает массовые операции: вставка и удаление (когда вы хотите удалить группу элементов, заданных их идентификаторами, а не через взвешенный путь выбора) из структуры данных в O(m + log n) время, где m - количество элементов, которые нужно удалить и n количество предметов в хранилище.

...