Я использую Google dataflow + Scio, чтобы выполнить перекрестное соединение набора данных с самим собой, чтобы выяснить topK наиболее похожих, выполняя косинусное сходство.Набор данных имеет около 200 тыс. Записей, а общий размер набора данных составляет ~ 300 МБ.
Я присоединяю набор данных к себе, передавая его в качестве побочного ввода, устанавливая значение workerCacheMB равным 500 МБ.
Набор данных является кортежем и выглядит следующим образом: (String, Set (Integer)).Первый элемент в кортеже - это URI, а следующий элемент - это набор индексов сущности.
Большинство записей в наборе данных имеют менее 500 индексов сущностей.Тем не менее, существует около 7000 записей, которые имеют более 10 тыс. Сущностей, а максимальная - 171 тыс. Сущностей.
У меня есть несколько горячих клавиш, и поэтому рабочая уттилизация выглядит следующим образом:
После того, как она масштабируется до 80 узлов, а затем уменьшается до 1 узла,он уже обработал около 90% записей.Я предполагаю, что горячие клавиши застряли в последнем узле, и остальное время потребовалось для обработки всех горячих клавиш.
Я попробовал опцию --experiments=shuffle_mode=service
.Хотя это дало улучшение, проблема сохраняется.Я размышлял о способах использования объединенного соединения HotKey, упомянутого здесь , однако с тех пор, как мне нужно найти сходство, я не думаю, что могу позволить себе разделить горячие сущности и воссоединиться с ними.
Мне было интересно, есть ли способ ее решить или мне в принципе придется с этим жить.Очевидно, это грубый способ найти симов.Тем не менее, я заинтересован в том, чтобы найти решение проблемы инженерии данных, позволяя инженерам ML использовать алгоритмы поиска Sim.
Урезанная версия кода выглядит следующим образом:
private def findSimilarities(e1: Set[Integer], e2: Set[Integer]): Float = {
val common = e1.intersect(e2)
val cosine = (common.size.toFloat) / (e1.size + e2.size).toFloat
cosine
}
val topN = sortedReverseTake[ElementSims](250)(by(_.getScore))
elements
.withSideInputs(elementsSI)
.flatMap { case (e1, si) =>
val fromUri = e1._1.toString
val fromEntities = e1._2
val sideInput: List[(String, Set[Integer])] = si(elementsSI)
val sims: List[ElementSims] = findSimilarities(fromUri,fromEntities,
sideInput)
topN(sims)
}
.toSCollection
.saveAsAvroFile(outputGCS)