Предположим, что у вас есть N
узлы с M
памятью на каждом узле.Кроме того, предположим, что f(a)
примерно одинакового размера для всех a <- ds
.Вы говорите, что хотите, чтобы f(a)
был распределенным Dataset
.Единственная причина, по которой вы бы настаивали на f
возврате Dataset
, состоит в том, что возвращаемое значение не помещается в память на одном узле, таким образом
|f(a)| >= M .
В то же время вы предполагаете, что bind(f, ds)
уместится в память, поэтому
N * M >= ds.size * |f(a)| >= ds.size * M
Если мы отменим M
, то там будет написано:
N >= ds.size
, то есть количество элементов в ds
должно бытьотносительно небольшой (меньше, чем количество вычислительных узлов).Это, в свою очередь, означает, что вы можете просто собрать его на главном узле, сопоставить его с набором данных, а затем взять объединение.Что-то вроде этого (не проверено):
def bind[A, B](f: A => Dataset[B], ds: Dataset[A]): Dataset[A] = {
ds.collect.map(f).reduce(_ union _)
}
Попытка превратить его в общую монаду не имеет большого смысла, потому что если вы читаете Dataset
как "огромный распределенный набор данных, который едва вписывается в огромный кластерс несколькими узлами ", то
ds
уже огромен - каждый
f(a)
огромен ds.flatMap(f)
огромен для степени двух,не помещается в память
Таким образом, общий bind
может быть:
- невозможен, потому что результат не помещается в память.
- заменено
fold(f: A => TraversableOnce[B])
, потому что f(a)
мало - заменено
ds.collect
, потому что ds
мало
И именно вы должны сделатьРешение «что мало» в каждом конкретном случае.Это, вероятно, причина, по которой не предоставляется универсальный flatMap(f: A => Dataset[B])
: при каждом вызове такого flatMap
.
необходимо принимать нетривиальное проектное решение.