Фильтрация набора данных с точки зрения другого набора данных в Scala Flink - PullRequest
0 голосов
/ 30 августа 2018

Я пытаюсь повторить этот код Python:

cond_entropy_x = np.array([entropy(x[y == v]) for v in uy])

Где x и y - векторы, а uy - уникальные значения на y, например 0,1.

В порыве у меня есть:

val uy = y.distinct.collect
val condHx = for (i ← uy)
    yield entropy(x.filterWithBcVariable(y)((_, yy) ⇒ yy == i))

Однако, похоже, filterWithBcVariable не принимает все значения в y, оно принимает только первое.

Я также пробовал:

for (i ← values) yield y.join(x).where(a ⇒ a).equalTo(_ ⇒ i)

Но мне не хватило памяти.

Как я могу отфильтровать x по значениям y?

Что-то вроде x.zip(y) будет делать это, но это не поддерживается.

Есть идеи?

1 Ответ

0 голосов
/ 31 августа 2018

Я предложил одно решение, может быть, не самое лучшее, но, по крайней мере, оно работает.

Теперь вместо передачи x и y как разделенного DataSets, я передаю DataSet[LabeledVector] только с столбцом:

val xy = input.map(lv ⇒ LabeledVector(lv.label, DenseVector(lv.vector(0))))

Затем я передаю xy моей функции:

def conditionalEntropy(xy: DataSet[LabeledVector]): Double = {
    // Get the label
    val y = xy map (_.label)
    // Get probs for the label
    val p = probs(y).toArray.asBreeze
    // Get unique values in label
    val values = y.distinct.collect
    // Compute Conditional Entropy
    val condH = for (i ← values)
      yield entropy(xy.filter(_.label == i))
    p.dot(seq2Breeze(condH))
  }
...