Как устранить Cost-центры в цепочках и списках - PullRequest
7 голосов
/ 09 октября 2011

Я реализую алгоритм поиска мотивов из области биоинформатики, используя Haskell. Я не буду вдаваться в подробности алгоритма, кроме как сказать, что это поиск ветвей и связанных медианных строк. Я планировал сделать мою реализацию более интересной, реализовав параллельный подход (а затем и подход STM), чтобы ускорить многоядерный процесс, но после компиляции со следующими флагами

$ ghc -prof -auto-all -O2 -fllvm -threaded -rtsopts --make main 

и при печати профиля я увидел что-то интересное (и, возможно, очевидное):

COST CENTRE      entries  %time %alloc  
hammingDistance  34677951  47.6   14.7  
motifs           4835446   43.8   71.1  

Ясно, что можно добиться значительного ускорения, не приближаясь к многоядерному программированию (хотя это было сделано, и мне просто нужно найти хорошие тестовые данные и разобраться с Criterion для этого).

В любом случае, обе эти функции являются чисто функциональными и никоим образом не параллельными. Они также делают довольно простые вещи, поэтому я был удивлен, что они заняли так много времени. Вот код для них:

data NukeTide = A | T | C | G deriving (Read, Show, Eq, Ord, Enum)

type Motif = [NukeTide] 

hammingDistance :: Motif -> Motif -> Int
hammingDistance [] [] = 0
hammingDistance xs [] = 0 -- optimistic
hammingDistance [] ys = 0 -- optimistic
hammingDistance (x:xs) (y:ys) = case (x == y) of
    True  -> hammingDistance xs ys
    False -> 1 + hammingDistance xs ys

motifs :: Int -> [a] -> [[a]]
motifs n nukeTides = [ take n $ drop k nukeTides | k <- [0..length nukeTides - n] ]

Обратите внимание, что из двух аргументов hammingDistance я могу предположить, что xs будет иметь длину x и что ys будет меньше или равен этому, если это открывает возможности для улучшений.

Как видите, hammingDistance рассчитывает расстояние Хемминга между двумя мотивами, которые являются списками нуклеотидов. Функция motifs принимает число и список и возвращает все подстроки этой длины, например ::

> motifs 3 "hello world"
["hel","ell","llo","lo ","o w"," wo","wor","orl","rld"]

Поскольку алгоритмические процессы настолько просты, я не могу придумать, как это оптимизировать дальше. У меня, однако, есть два предположения о том, куда мне направиться:

  1. HammingDistance: используемые мной типы данных (NukeTides и []) медленные / неуклюжие. Это всего лишь предположение, так как я не знаком с их реализациями, но я думаю, что определение моего собственного типа данных, хотя и более разборчиво, возможно, потребует больше накладных расходов, чем я собираюсь. Кроме того, сопоставление с образцом чуждо мне, я не знаю, является ли это тривиальным или дорогостоящим.
  2. Мотивы: если я правильно читаю, 70% всех выделений памяти выполняется мотивами, и я бы предположил, что в какой-то момент это должен быть сбор мусора. Снова использование универсального списка может замедлить меня или понимание списка, поскольку стоимость этого невероятно неясна для меня.

У кого-нибудь есть какие-нибудь советы по обычной процедуре здесь? Если типы данных являются проблемой, будут ли массивы правильным ответом? (Я слышал, они приходят в коробках)

Спасибо за помощь.

Редактировать: Мне просто пришло в голову, что было бы полезно, если бы я описал способ, которым эти две функции называются:

totalDistance :: Motif -> Int
totalDistance motif = sum $ map (minimum . map (hammingDistance motif) . motifs l) dna

Эта функция является результатом другой функции и передается вокруг узлов в дереве. В каждом узле дерева выполняется оценка нуклеотида (длины <= n, то есть, если == n, то это листовой узел), с использованием totalDistance для оценки узла. С тех пор это ваш типичный алгоритм ветвления и привязки. </p>

Редактировать: Джон попросил, чтобы я распечатал внесенное мной изменение, которое фактически исключило стоимость мотивов:

scoreFunction :: DNA -> Int -> (Motif -> Int)
scoreFunction dna l = totalDistance
    where
        -- The sum of the minimum hamming distance in each line of dna
        -- is given by totalDistance motif
        totalDistance motif = sum $ map (minimum . map (hammingDistance motif)) possibleMotifs
        possibleMotifs = map (motifs l) dna -- Previously this was computed in the line above

Я не прояснил это в своем первоначальном посте, но ScoreFunction вызывается только один раз, а результат передается в обход дерева / ветви и связывается и используется для оценки узлов. Пересчет мотивов на каждом этапе пути, оглядываясь назад, не является одной из самых ярких вещей, которые я сделал.

Ответы [ 3 ]

7 голосов
/ 09 октября 2011

Ваше определение motifs выглядит так, как будто оно делает намного больше обхода, чем необходимо, потому что каждое приложение drop должно проходить список с самого начала. Я бы реализовал это, используя Data.List.tails вместо:

motifs2 :: Int -> [a] -> [[a]]
motifs2 n nukeTides = map (take n) $ take count $ tails nukeTides
  where count = length nukeTides - n + 1

Быстрое сравнение в GHCi показывает разницу (используя sum . map length для принудительной оценки):

*Main> let xs = concat (replicate 10000 [A, T, C, G])
(0.06 secs, 17914912 bytes)
*Main> sum . map length $ motifs 5 xs
199980
(3.47 secs, 56561208 bytes)
*Main> sum . map length $ motifs2 5 xs
199980
(0.15 secs, 47978952 bytes)
6 голосов
/ 09 октября 2011

Ваше определение hammingDistance, вероятно, гораздо менее эффективно, чем могло бы быть.

hammingDistance (x:xs) (y:ys) = case (x == y) of
    True  -> hammingDistance xs ys
    False -> 1 + hammingDistance xs ys

Из-за лени Haskell это будет расширено (в худшем случае):

(1 + (1 + (1 + ...)))

, который будет существовать как стек в стеке, уменьшаясь только при использовании. Является ли это на самом деле проблемой, зависит от сайта вызовов, параметров компилятора и т. Д., Поэтому часто рекомендуется писать код в форме, которая полностью исключает эту проблему.

Распространенным решением является создание хвостовой рекурсивной формы со строгим аккумулятором, но в этом случае вы можете использовать функции высшего порядка, например:

hammingDistance :: Motif -> Motif -> Int
hammingDistance xs ys = length . filter (uncurry (==)) $ zip xs ys

вот хвостовая рекурсивная реализация, для сравнения

hammingDistance :: Motif -> Motif -> Int
hammingDistance xs ys = go 0 xs ys
  where
    go !acc [] [] = acc
    go !acc xs [] = acc -- optimistic
    go !acc [] ys = acc -- optimistic
    go !acc (x:xs) (y:ys) = case (x == y) of
      True  -> go acc xs ys
      False -> go (acc+1) xs ys

При этом используется расширение BangPatterns для принудительной строгой оценки аккумулятора, иначе это будет иметь ту же проблему, что и ваше текущее определение.

Чтобы прямо ответить на некоторые другие ваши вопросы:

  1. Сопоставление с образцом тривиально
  2. Нужно ли использовать списки или массивы, зависит в основном от того, как создаются данные и как они используются. Для этого случая, возможно, списки могут быть лучшим типом. В частности, если все ваши списки используются по мере их создания и вам не нужен весь список в памяти, все должно быть в порядке. Однако если вы сохраняете списки в памяти, они занимают много места.

Шаблоны использования

Я думаю, что использование этих функций также делает некоторую дополнительную работу:

(minimum . map (hammingDistance motif) . motifs l

Поскольку вам нужен только минимум hammingDistance, возможно, вы вычисляете много дополнительных значений, в которых нет необходимости. Я могу придумать два решения этого:

Опция 1. Определите новую функцию hammingDistanceThresh :: Motif -> Int -> Motif -> Int, которая останавливается, когда она превышает пороговое значение. Немного странный порядок типов облегчает использование его в сгибе, например:

let motifs' = motifs l
in foldl' (hammingDistanceThresh motif) (hammingDistance motif $ head motifs') (tail motifs')

Вариант 2. Если вы определяете ленивый тип натуральных чисел, вы можете использовать его вместо Int s для результата hammingDistance. Тогда будет вычислено только столько расстояния Хэмминга, сколько необходимо.

Последнее замечание: использование -auto-all очень часто генерирует гораздо более медленный код, чем другие параметры профилирования. Я бы посоветовал вам сначала использовать -auto, а затем добавить при необходимости аннотации SCC.

2 голосов
/ 10 октября 2011

Правильно ... Я не удержался от предела и написал реализацию набитых битов с простым металлом:

{-# language TypeSynonymInstances #-}
{-# language BangPatterns #-}

import Data.Bits
import Data.Word


data NukeTide = A | T | C | G deriving (Read, Show, Eq, Ord, Enum)

type UnpackedMotif = [NukeTide] 

type PackageType = Word32
nukesInPackage = 16 :: Int
allSetMask = complement 0 :: PackageType


-- Be careful to have length of motif == nukesInPackage here!
packNukesToWord :: UnpackedMotif -> PackageType
packNukesToWord = packAt 0
    where packAt _ [] = 0
          packAt i (m:ml) =    (b0 m .&. bit i)
                           .|. (b1 m .&. bit (i+1))
                           .|. packAt (i+2) ml
          b0 A = 0
          b0 T = allSetMask
          b0 C = 0
          b0 G = allSetMask
          b1 A = 0
          b1 T = 0
          b1 C = allSetMask
          b1 G = allSetMask

unpackNukesWord :: PackageType -> UnpackedMotif
unpackNukesWord = unpackNNukesFromWord nukesInPackage

unpackNNukesFromWord :: Int -> PackageType -> UnpackedMotif
unpackNNukesFromWord = unpackN
    where unpackN 0 _ = []
          unpackN i w = (nukeOf $ w .&. r2Mask):(unpackN (i-1) $ w`shiftR`2)
          nukeOf bs
           | bs == 0      = A
           | bs == bit 0  = T
           | bs == bit 1  = C
           | otherwise    = G
          r2Mask = (bit 1 .|. bit 0) :: PackageType


data PackedMotif = PackedMotif { motifPackets::[PackageType]
                               , nukesInLastPack::Int        }
 -- note nukesInLastPack will never be zero; motifPackets must be [] to represent empty motifs.
packNukes :: UnpackedMotif -> PackedMotif
packNukes m = case remain of
               [] -> PackedMotif [packNukesToWord takeN] (length takeN)
               r  -> prAppend (packNukesToWord takeN) (packNukes r)
    where (takeN, remain) = splitAt nukesInPackage m
          prAppend w (PackedMotif l i) = PackedMotif (w:l) i

unpackNukes :: PackedMotif -> UnpackedMotif
unpackNukes (PackedMotif l i) = unpack l i
  where unpack [l] i = unpackNNukesFromWord i l
        unpack (l:ls) i = unpackNukesWord l ++ unpack ls i
        unpack [] _ = []

instance Show PackedMotif where
  show = show . unpackNukes



class Nukes a where
  pLength :: a -> Int
  shiftLN1 :: a -> a
  hammingDistance :: a -> a -> Int
  motifs :: Int -> a -> [a]

instance Nukes PackageType where
  pLength _ = nukesInPackage
  shiftLN1 = (`shiftR`2)
  hammingDistance !x !y = fromIntegral $ abt (x `xor` y)
      where abt !b = bbt(b.&.a0Mask .|. ((b.&.a1Mask) `shiftR` 1))
            bbt !b = sbt $ (b.&.r16Mask) + (b `shiftR` nukesInPackage)
            sbt !b = (r2Mask .&. b)             + (r2Mask .&. (b`shiftR`2))
                   + (r2Mask .&. (b`shiftR`4))  + (r2Mask .&. (b`shiftR`6))
                   + (r2Mask .&. (b`shiftR`8))  + (r2Mask .&. (b`shiftR`10))
                   + (r2Mask .&. (b`shiftR`12)) + (r2Mask .&. (b`shiftR`14))
            a0Mask = 0x55555555 :: PackageType
            a1Mask = 0xAAAAAAAA :: PackageType
            r16Mask = 0xFFFF :: PackageType
            r2Mask = 0x3 :: PackageType
  motifs 0 _ = []
  motifs l x = x : motifs (l-1) (shiftLN1 x)


maskNukesBut :: Int -> PackageType -> PackageType
maskNukesBut i = ( ( allSetMask `shiftR` (2*(nukesInPackage - i)) ) .&.)

instance Nukes PackedMotif where
  pLength (PackedMotif (x:xs) ix) = nukesInPackage * (length xs) + ix
  pLength _ = 0
  shiftLN1 ξ@(PackedMotif [] _) = ξ
  shiftLN1 (PackedMotif [x] ix) | ix>1       = PackedMotif [x`shiftR`2] (ix-1)
                                | otherwise  = PackedMotif [] nukesInPackage
  shiftLN1 (PackedMotif (x:x':xs) ix)
        = PackedMotif (( shiftLN1 x .|. pnext ):sxs) resLMod
      where sxs = motifPackets $ shiftLN1 (PackedMotif (x':xs) ix)
            pnext = shiftL (x'.&.0x3) 30
            resLMod = if ix > 1 then (ix-1) else nukesInPackage
  hammingDistance xs ys = go 0 xs ys
    where
      go :: Int -> PackedMotif -> PackedMotif -> Int
      go !acc (PackedMotif [x] ix) (PackedMotif [y] iy)
       | ix > iy    = acc + (hammingDistance y $ maskNukesBut iy x)
       | otherwise  = acc + (hammingDistance x $ maskNukesBut ix y)
      go !acc (PackedMotif [x] ix) (PackedMotif (y:ys) iy)
        = acc + (hammingDistance x $ maskNukesBut ix y)
      go !acc (PackedMotif (x:xs) ix) (PackedMotif [y] iy)
        = acc + (hammingDistance y $ maskNukesBut iy x)
      go !acc (PackedMotif (x:xs) ix) (PackedMotif (y:ys) iy)
        = go (acc + hammingDistance x y) (PackedMotif xs ix) (PackedMotif ys iy)
      go !acc _ _ = acc
  motifs l ξ
     | l>0        = fShfts (min nukesInPackage $ pLength ξ + 1 - l) ξ >>= ct
     | otherwise  = []
    where fShfts k χ | k > 0      = χ : fShfts (k-1) (shiftLN1 χ)
                     | otherwise  = []
          ct (PackedMotif ys iy) = case remain of
                [] -> if (length takeN - 1) * nukesInPackage + iy >= l
                       then [PackedMotif takeN lMod] else []
                _  -> PackedMotif takeN lMod : ct(PackedMotif (tail ys) iy)
            where (takeN, remain) = splitAt lQuot ys
          (lQuot,lMod) = case l `quotRem` nukesInPackage of
                   (i,0) -> (i, nukesInPackage)
                   (i,m) -> (i+1, m)

Может использоваться с UnpackedMotif = [NukeTide] с функцией packNukes, например,

*BioNuke0> motifs 23 $ packNukes $ take 27 $ cycle [A,T,G,C,A]
[[A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G],[T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C],[G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A],[C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A],[A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T]]

*BioNuke0> hammingDistance (packNukes [A,T,G,C,A,A,T,G]) (packNukes [A,T,C,C,A,T,G])
3

*BioNuke0> map (hammingDistance (packNukes $ take 52 $ cycle [A,T,C,C,A,T,G])) (motifs 52 $ packNukes $ take 523 $ cycle [A,T,C,C,A,T,G])
[0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44]

Я еще не сравнивал производительность с исходной версией, но она должна быть немного быстрее, чем любая реализация алгебраического типа. Кроме того, он с готовностью предлагает компактный формат хранения.

...