Эффективный способ оптимизации кода Scala для чтения большого файла, который не помещается в памяти - PullRequest
0 голосов
/ 13 апреля 2019

Постановка проблемы ниже,

У нас есть большой файл журнала, в котором хранятся взаимодействия пользователя с приложением. Записи в файле журнала следуют следующей схеме: {userId, timestamp, actionType}, где actionType - одно из двух возможных значений: [open, close]

Ограничения:

  1. Файл журнала слишком большой, чтобы поместиться в памяти на одном компьютере. Также предположим, что агрегированные данные не помещаются в память.
  2. Код должен работать на одной машине.
  3. Не следует использовать готовую реализацию mapreduce или сторонней базы данных; не думайте, что у нас есть Hadoop, Spark или другая распределенная вычислительная среда.
  4. Может быть несколько записей каждого actionType для каждого пользователя, и в файле журнала могут отсутствовать записи. Таким образом, пользователь может пропустить закрытую запись между двумя открытыми записями или наоборот.
  5. Метки времени будут приходить в строго возрастающем порядке.

Для этой проблемы нам нужно реализовать класс / классы, который вычисляет среднее время, затрачиваемое каждым пользователем между открытием и закрытием. Имейте в виду, что для некоторых пользователей отсутствуют записи, поэтому нам придется выбирать, как обрабатывать эти записи при выполнении наших расчетов. Кодекс должен следовать последовательной политике в отношении того, как мы делаем этот выбор.

Желаемым выводом для решения должно быть [{userId, timeSpent},….] Для всех пользователей в файле журнала.

Пример файла журнала (через запятую, текстовый файл)

1,1435456566,open 
2,1435457643,open 
3,1435458912,open 
1,1435459567,close 
4,1435460345,open 
1,1435461234,open 
2,1435462567,close 
1,1435463456,open 
3,1435464398,close 
4,1435465122,close 
1,1435466775,close

подход

Ниже приведен код, написанный мною на Python & Scala, который, по-видимому, неэффективен и соответствует ожиданиям данного сценария. Я хотел бы высказать мнение сообщества разработчиков на этом форуме, как лучше мы могли бы оптимизировать это код по заданному сценарию.

Реализация Scala

import java.io.FileInputStream
import java.util.{Scanner, Map, LinkedList}
import java.lang.Long
import scala.collection.mutable

object UserMetrics extends App {
  if (args.length == 0) {
    println("Please provide input data file name for processing")
  } 
  val userMetrics = new UserMetrics()
  userMetrics.readInputFile(args(0),if (args.length == 1) 600000 else args(1).toInt)
}

case class UserInfo(userId: Integer, prevTimeStamp: Long, prevStatus: String, timeSpent: Long, occurence: Integer)

class UserMetrics {

  val usermap = mutable.Map[Integer, LinkedList[UserInfo]]()

  def readInputFile(stArr:String, timeOut: Int) {
    var inputStream: FileInputStream = null
    var sc: Scanner = null
    try {
      inputStream = new FileInputStream(stArr);
      sc = new Scanner(inputStream, "UTF-8");
      while (sc.hasNextLine()) {
        val line: String = sc.nextLine();
        processInput(line, timeOut)
      }

      for ((key: Integer, userLs: LinkedList[UserInfo]) <- usermap) {
        val userInfo:UserInfo = userLs.get(0)
        val timespent = if (userInfo.occurence>0) userInfo.timeSpent/userInfo.occurence else 0
        println("{" + key +","+timespent + "}")
      }

      if (sc.ioException() != null) {
        throw sc.ioException();
      }
    } finally {
      if (inputStream != null) {
        inputStream.close();
      }
      if (sc != null) {
        sc.close();
      }
    }
  }

  def processInput(line: String, timeOut: Int) {
    val strSp = line.split(",")

    val userId: Integer = Integer.parseInt(strSp(0))
    val curTimeStamp = Long.parseLong(strSp(1))
    val status = strSp(2)
    val uInfo: UserInfo = UserInfo(userId, curTimeStamp, status, 0, 0)
    val emptyUserInfo: LinkedList[UserInfo] = new LinkedList[UserInfo]()

    val lsUserInfo: LinkedList[UserInfo] = usermap.getOrElse(userId, emptyUserInfo)

    if (lsUserInfo != null && lsUserInfo.size() > 0) {
      val lastUserInfo: UserInfo = lsUserInfo.get(lsUserInfo.size() - 1)
      val prevTimeStamp: Long = lastUserInfo.prevTimeStamp
      val prevStatus: String = lastUserInfo.prevStatus

      if (prevStatus.equals("open")) {
        if (status.equals(lastUserInfo.prevStatus)) {
           val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
           val timeDiff = lastUserInfo.timeSpent + timeSelector
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
        } else if(!status.equals(lastUserInfo.prevStatus)){
          val timeDiff = lastUserInfo.timeSpent + curTimeStamp - prevTimeStamp
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, timeDiff, lastUserInfo.occurence + 1))
        }
      } else if(prevStatus.equals("close")) {
        if (status.equals(lastUserInfo.prevStatus)) {
          lsUserInfo.remove()
          val timeSelector = if ((curTimeStamp - prevTimeStamp) > timeOut) timeOut else curTimeStamp - prevTimeStamp
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent + timeSelector, lastUserInfo.occurence+1))
        }else if(!status.equals(lastUserInfo.prevStatus))
          {     
          lsUserInfo.remove()
          lsUserInfo.add(UserInfo(userId, curTimeStamp, status, lastUserInfo.timeSpent, lastUserInfo.occurence))
        }
      }
    }else if(lsUserInfo.size()==0){
      lsUserInfo.add(uInfo)
    }
    usermap.put(userId, lsUserInfo)
  }

}

Реализация Python

import sys

def fileBlockStream(fp, number_of_blocks, block):
    #A generator that splits a file into blocks and iterates over the lines of one of the blocks.

    assert 0 <= block and block < number_of_blocks #Assertions to validate number of blocks given
    assert 0 < number_of_blocks

    fp.seek(0,2) #seek to end of file to compute block size
    file_size = fp.tell() 

    ini = file_size * block / number_of_blocks #compute start & end point of file block
    end = file_size * (1 + block) / number_of_blocks

    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()

    while fp.tell() < end:
        yield fp.readline() #iterate over lines of the particular chunk or block

def computeResultDS(chunk,avgTimeSpentDict,defaultTimeOut):
    countPos,totTmPos,openTmPos,closeTmPos,nextEventPos = 0,1,2,3,4
    for rows in chunk.splitlines():
        if len(rows.split(",")) != 3:
            continue
        userKeyID = rows.split(",")[0]
        try:
            curTimeStamp = int(rows.split(",")[1])
        except ValueError:
            print("Invalid Timestamp for ID:" + str(userKeyID))
            continue
        curEvent = rows.split(",")[2]
        if userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "close": 
        #Check if already existing userID with expected Close event 0 - Open; 1 - Close
        #Array value within dictionary stores [No. of pair events, total time spent (Close tm-Open tm), Last Open Tm, Last Close Tm, Next expected Event]
            curTotalTime = curTimeStamp - avgTimeSpentDict[userKeyID][openTmPos]
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount
            avgTimeSpentDict[userKeyID][totTmPos] = totalTime
            avgTimeSpentDict[userKeyID][closeTmPos] = curTimeStamp
            avgTimeSpentDict[userKeyID][nextEventPos] = 0 #Change next expected event to Open

        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "open":
            avgTimeSpentDict[userKeyID][openTmPos] = curTimeStamp
            avgTimeSpentDict[userKeyID][nextEventPos] = 1 #Change next expected event to Close

        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==1 and curEvent == "open":
            curTotalTime,closeTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][openTmPos],curTimeStamp)
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            avgTimeSpentDict[userKeyID][totTmPos]=totalTime
            avgTimeSpentDict[userKeyID][closeTmPos]=closeTime
            avgTimeSpentDict[userKeyID][openTmPos]=curTimeStamp
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount          

        elif userKeyID in avgTimeSpentDict.keys() and avgTimeSpentDict[userKeyID][nextEventPos]==0 and curEvent == "close": 
            curTotalTime,openTime = missingHandler(defaultTimeOut,avgTimeSpentDict[userKeyID][closeTmPos],curTimeStamp)
            totalTime = curTotalTime + avgTimeSpentDict[userKeyID][totTmPos]
            avgTimeSpentDict[userKeyID][totTmPos]=totalTime
            avgTimeSpentDict[userKeyID][openTmPos]=openTime
            eventCount = avgTimeSpentDict[userKeyID][countPos] + 1
            avgTimeSpentDict[userKeyID][countPos] = eventCount

        elif curEvent == "open":
            #Initialize userid with Open event
            avgTimeSpentDict[userKeyID] = [0,0,curTimeStamp,0,1]

        elif curEvent == "close":
            #Initialize userid with missing handler function since there is no Open event for this User
            totaltime,OpenTime = missingHandler(defaultTimeOut,0,curTimeStamp)
            avgTimeSpentDict[userKeyID] = [1,totaltime,OpenTime,curTimeStamp,0]

def missingHandler(defaultTimeOut,curTimeVal,lastTimeVal):
    if lastTimeVal - curTimeVal > defaultTimeOut:
        return defaultTimeOut,curTimeVal
    else:
        return lastTimeVal - curTimeVal,curTimeVal

def computeAvg(avgTimeSpentDict,defaultTimeOut):
    resDict = {}
    for k,v in avgTimeSpentDict.iteritems():
        if v[0] == 0:
            resDict[k] = 0
        else:
            resDict[k] = v[1]/v[0]
    return resDict

if __name__ == "__main__":
    avgTimeSpentDict = {}
    if len(sys.argv) < 2:
        print("Please provide input data file name for processing")
        sys.exit(1)

    fileObj = open(sys.argv[1])
    number_of_chunks = 4 if len(sys.argv) < 3 else int(sys.argv[2])
    defaultTimeOut = 60000 if len(sys.argv) < 4 else int(sys.argv[3])
    for chunk_number in range(number_of_chunks):
        for chunk in fileBlockStream(fileObj, number_of_chunks, chunk_number):
            computeResultDS(chunk, avgTimeSpentDict, defaultTimeOut)
    print (computeAvg(avgTimeSpentDict,defaultTimeOut))
    avgTimeSpentDict.clear() #Nullify dictionary 
    fileObj.close #Close the file object

Обе программы выше дают желаемый результат, но эффективность - это то, что имеет значение для этого конкретного сценария. Дайте мне знать, если у вас есть что-нибудь лучше или какие-либо предложения по существующей реализации.

Заранее спасибо !!

1 Ответ

0 голосов
/ 14 апреля 2019

Что вам нужно, так это использование итератора.Я не собираюсь переписывать ваш код, но уловка здесь, вероятно, заключается в использовании iterator.К счастью, Scala предоставляет достойные готовые инструменты для работы.

import scala.io.Source
object ReadBigFiles {
  def read(fileName: String): Unit = {
    val lines: Iterator[String] = Source.fromFile(fileName).getLines
    // now you get iterator semantics for the file line traversal
    // that means you can only go through the lines once, but you don't incur a penalty on heap usage
  }
}

Для вашего случая использования вам, кажется, требуется lastUser, поэтому вы имеете дело с группами из 2 записей.Я думаю, что у вас есть два варианта: либо пойти на iterator.sliding(2), который будет производить итераторы для каждой пары, либо просто добавить рекурсию к миксу с использованием опций.

def navigate(source: Iterator[String], last: Option[User]): ResultType = {
  if (source.hasNext) {
    val current = source.next()
    last match {
      case Some(existing) => // compare with previous user etc
      case None => navigate(source, Some(current))
    }
  } else {
    // exit recursion, return result
  }
}

Вы можете избежать всего кода, который вы 'мы написали, чтобы прочитать файл и так далее.Если вам нужно подсчитать вхождения, просто создайте Map внутри своей рекурсии и увеличивайте вхождения на каждом шаге в соответствии с вашей бизнес-логикой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...