Форматировать строку в вектор, как уменьшитьByKey (список (n_1, m_1) .... (n_k, m_k)) до (n_1 ... n_k) (m_1 ... m_k)) - PullRequest
0 голосов
/ 20 октября 2018

Данные

СДР, считанные из textFile (), состоящие из списка пар (str-key, [int-id, int-value]).

[(u'ID1', (132, 1)),
 (u'ID2', (133, 3)),
 (u'ID3', (120, 5)),
 (u'ID4', (110, 0)),
 (u'ID5', (160, 2)),
 (u'ID6', (500, 9)),
 (u'ID7', (932, 8)),
 (u'ID8', (132, 1)),
 (u'ID1', (133, 6)),
 (u'ID8', (133, 1))]

Выходные данные Я хотел бы эффективно создать СДР из списка (ключевые, плотные / разреженные) с как можно меньшим количеством перетасовок

Редактировать: на основе комментария ниже,Это невозможно сделать в Spark независимо от группы / агрегата

Densevector

Файл, который читается, упорядочен по int-id,так что, если бы я выбрасывал int-id и lowerByKey на str-key, я мог бы сформировать DenseVector со значением int

rdd.map(lambda x: (x[0], [x[1]]))\
    .reduceByKey(lambda a, b: a + b)\
    .map(lambda x: [x[0], DenseVector(x[1])])

. Он дал бы мне правильный порядок значений int с 1 разделом,но очень медленноС более чем 1 разделом и рабочими это может быть очень быстро, но порядок является случайным для клавиши str-key.Например, для идентификаторов str-key ID1 и ID8 желаемым выходным значением будет [1, 6], [3, 1] или [6, 1], [1, 3], но оно не может быть [1, 6],[1, 3].

1) Есть ли способ уменьшитьByKey, но сохранить порядок файлов / чтения (или изменить порядок результатов на основе int-ID)?

Sparsevector

Для Sparsevector я пытаюсь передать список пар [int-d, int-value] напрямую, но это требует агрегирования по идентификатору.GroupByKey () вызывает массовую перестановку.

RDD.map(lambda x: (x[0], (int(x[1]), int(x[2]))))\
            .groupByKey()\
            .mapValues(list)\
            .mapValues(sorted)\
            .mapValues(lambda x: (SparseVector(N, x)))

Список агрегирует данные [(int-id, value), (int-id_2, value_2) .... (int-id_n, value_n)]для каждого ключа.Сортировка есть, так как sparseVector требуется отсортированный список или дикт.

2) Есть ли способ написать это более эффективно?

1 Ответ

0 голосов
/ 21 октября 2018

Если данные разрежены (вы можете вычислить точный порог разброса, в зависимости от ожидаемого размера ключа), groupByKey является оптимальным решением - для каждой строки вы должны перемешать:

  • Ключ.
  • Значение.Поскольку это tuple примитивных значений, нет необходимости в полноправном __dict__, а его размер как можно меньше.

Поскольку пары (индекс, значение) в васвопрос кажется уникальным, нет уменьшения в случайном порядке на размер значения, но любой сложный объект (например, вектор), вероятно, будет иметь большие издержки, чем tuple.

Единственное возможное сокращение происходитна ключевой стороне.Чтобы достичь такого, который перевешивает увеличение размера значения, вам нужны достаточно плотные данные.

В этом случае aggregateByKey может работать лучше, хотя дополнительные затраты на объединение могут по-прежнему использовать возможные преимущества объединения на стороне карты..

def seq_func(acc, x):
    if x[1]:
        acc[x[0]] = acc.get(x[0], 0) + x[1]
    return acc

def comb_func(acc1, acc2):
    for k in acc2:
        acc1[k] = acc1.get(k, 0) + acc2[k]
    return acc1

rdd.aggregateByKey(dict(), seq_func, comb_func).mapValues(lambda d: SparseVector(N, d))

В противном случае просто groupByKey, пропустите сортировку и используйте dict:

rdd.groupByKey().mapValues(lambda x: SparseVector(N, dict(x)))
...