Поток данных: горячие клавиши в перекрестном соединении - PullRequest
0 голосов
/ 16 мая 2019

Я использую Google dataflow + Scio, чтобы выполнить перекрестное соединение набора данных с самим собой, чтобы выяснить topK наиболее похожих, выполняя косинусное сходство.Набор данных имеет около 200 тыс. Записей, а общий размер набора данных составляет ~ 300 МБ.

Я присоединяю набор данных к себе, передавая его в качестве побочного ввода, устанавливая значение workerCacheMB равным 500 МБ.

Набор данных является кортежем и выглядит следующим образом: (String, Set (Integer)).Первый элемент в кортеже - это URI, а следующий элемент - это набор индексов сущности.

Большинство записей в наборе данных имеют менее 500 индексов сущностей.Тем не менее, существует около 7000 записей, которые имеют более 10 тыс. Сущностей, а максимальная - 171 тыс. Сущностей.

У меня есть несколько горячих клавиш, и поэтому рабочая уттилизация выглядит следующим образом: enter image description here

После того, как она масштабируется до 80 узлов, а затем уменьшается до 1 узла,он уже обработал около 90% записей.Я предполагаю, что горячие клавиши застряли в последнем узле, и остальное время потребовалось для обработки всех горячих клавиш.

Я попробовал опцию --experiments=shuffle_mode=service.Хотя это дало улучшение, проблема сохраняется.Я размышлял о способах использования объединенного соединения HotKey, упомянутого здесь , однако с тех пор, как мне нужно найти сходство, я не думаю, что могу позволить себе разделить горячие сущности и воссоединиться с ними.

Мне было интересно, есть ли способ ее решить или мне в принципе придется с этим жить.Очевидно, это грубый способ найти симов.Тем не менее, я заинтересован в том, чтобы найти решение проблемы инженерии данных, позволяя инженерам ML использовать алгоритмы поиска Sim.

Урезанная версия кода выглядит следующим образом:


private def findSimilarities(e1: Set[Integer], e2: Set[Integer]): Float = {
    val common = e1.intersect(e2)
    val cosine = (common.size.toFloat) / (e1.size + e2.size).toFloat
    cosine
  }

val topN = sortedReverseTake[ElementSims](250)(by(_.getScore))

elements
      .withSideInputs(elementsSI)
      .flatMap { case (e1, si) =>
        val fromUri = e1._1.toString
        val fromEntities = e1._2
        val sideInput: List[(String, Set[Integer])] = si(elementsSI)
        val sims: List[ElementSims] = findSimilarities(fromUri,fromEntities,
          sideInput)
        topN(sims)
      }
      .toSCollection
      .saveAsAvroFile(outputGCS)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...