Скала параллельное исполнение - PullRequest
0 голосов
/ 10 ноября 2019

Я работаю над требованием получения статистики о файлах, хранящихся в Linux с использованием Scala.

Мы передадим корневой каталог в качестве входных данных, а наш код получит полный список подкаталогов для переданного корневого каталога. .

Затем для каждого каталога в списке я получу список файлов, а для каждого файла я получу владельцев, группы, разрешение, время последнего изменения, время создания, время lastaccess.

Проблема в том, какчтобы я мог обрабатывать список каталогов параллельно, чтобы получить статистику файлов, хранящихся в этом каталоге.

В производственной среде у нас есть 100000+ папок внутри корневых папок.

Итак, мой списоксписок из 100000+ папок.

Как я могу парализовать свою операцию (статистику файлов) в моем доступном списке.

Поскольку я новичок в Scala, помогите мне в этом требовании.

Извините за публикацию без фрагмента кода.

Спасибо.

1 Ответ

2 голосов
/ 11 ноября 2019

Я закончил тем, что использовал актеров Акки.

Я сделал предположения относительно желаемого результата, чтобы программа была простой и быстрой. Я сделал предположение, что вывод - это JSON, иерархия не сохраняется, и что допустимы несколько файлов. Если вам не нравится JSON, вы можете заменить его чем-то другим, но два других предположения важны для сохранения текущей скорости и простоты программы.

Есть некоторые параметры командной строки, которые вы можете установить. Если вы не установите их, будут использоваться значения по умолчанию. Значения по умолчанию содержатся в Main.scala.

Параметры командной строки следующие:

(0) корневой каталог, с которого вы начинаете;(нет по умолчанию)

(1) интервал времени ожидания (в секундах) для всех таймаутов в этой программе;(по умолчанию 60)

(2) количество используемых принтером актеров;это будет количество созданных файлов журнала;(по умолчанию 50)

(3) интервал между тиками, используемый для актера монитора;(по умолчанию 500)

Для тайм-аута имейте в виду, что это значение временного интервала ожидания при завершении программы. Поэтому, если вы запускаете небольшую работу и удивляетесь, почему ее выполнение занимает минуту, то это потому, что она ожидает истечения интервала времени ожидания перед закрытием программы.

Поскольку вы выполняете такую ​​большую работу,Возможно, что тайм-аут по умолчанию 60 слишком мал. Если вы получаете исключения, жалующиеся на тайм-аут, увеличьте значение тайм-аута.

Обратите внимание, что если установлен слишком большой интервал между тиками, есть вероятность, что ваша программа преждевременно закроется.

Для запускапросто запустите sbt в папке проекта и введите

runMain Main <canonical path of root directory>

Я не могу понять, как получить группу файла в Java. Вам нужно исследовать это и добавить соответствующий код в Entity.scala и TraverseActor.scala.

Кроме того, f.list () в TraverseActor.scala иногда возвращался как ноль, поэтому я обернул егов варианте. Вам нужно будет отладить эту проблему, чтобы убедиться, что у вас нет ошибок в некоторых файлах.

Теперь вот содержимое всех файлов.

build.sbt

name := "stackoverflow20191110"

version := "0.1"

scalaVersion := "2.12.1"

libraryDependencies ++= Seq(
  "io.circe" %% "circe-core",
  "io.circe" %% "circe-generic",
  "io.circe" %% "circe-parser"
).map(_ % "0.12.2")

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.16"

Entity.scala

import io.circe.Encoder
import io.circe.generic.semiauto._

sealed trait Entity {
  def path: String
  def owner: String
  def permissions: String
  def lastModifiedTime: String
  def creationTime: String
  def lastAccessTime: String
  def hashCode: Int
}

object Entity {
  implicit val entityEncoder: Encoder[Entity] = deriveEncoder
}

case class FileEntity(path: String, owner: String, permissions: String, lastModifiedTime: String, creationTime: String, lastAccessTime: String) extends Entity

object fileentityEncoder {
  implicit val fileentityEncoder: Encoder[FileEntity] = deriveEncoder
}

case class DirectoryEntity(path: String, owner: String, permissions: String, lastModifiedTime: String, creationTime: String, lastAccessTime: String) extends Entity

object DirectoryEntity {
  implicit val directoryentityEncoder: Encoder[DirectoryEntity] = deriveEncoder
}

case class Contents(path: String, files: IndexedSeq[Entity])

object Contents {
  implicit val contentsEncoder: Encoder[Contents] = deriveEncoder
}

Main.scala

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import java.io.{BufferedWriter, File, FileWriter}

import ShutDownActor.ShutDownYet

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Try

object Main {

  val defaultNumPrinters = 50

  val defaultMonitorTickInterval = 500

  val defaultTimeoutInS = 60

  def main(args: Array[String]): Unit = {
    val timeoutInS = Try(args(1).toInt).toOption.getOrElse(defaultTimeoutInS)

    val system = ActorSystem("SearchHierarchy")

    val shutdown = system.actorOf(ShutDownActor.props)

    val monitor = system.actorOf(MonitorActor.props(shutdown, timeoutInS))

    val refs = (0 until Try(args(2).toInt).toOption.getOrElse(defaultNumPrinters)).map{x =>
      val name = "logfile" + x
      (name, system.actorOf(PrintActor.props(name, Try(args(3).toInt).toOption.getOrElse(defaultMonitorTickInterval), monitor)))
    }

    val root = system.actorOf(TraverseActor.props(new File(args(0)), refs))

    implicit val askTimeout = Timeout(timeoutInS seconds)

    var isTimedOut = false

    while(!isTimedOut){
      Thread.sleep(30000)
      val fut = (shutdown ? ShutDownYet).mapTo[Boolean]
      isTimedOut = Await.result(fut, timeoutInS seconds)
    }

    refs.foreach{ x =>
      val fw = new BufferedWriter(new FileWriter(new File(x._1), true))
      fw.write("{}\n]")
      fw.close()
    }

    system.terminate
  }

}

MonitorActor.scala

import MonitorActor.ShutDown
import akka.actor.{Actor, ActorRef, Props, ReceiveTimeout, Stash}
import io.circe.syntax._

import scala.concurrent.duration._

class MonitorActor(shutdownActor: ActorRef, timeoutInS: Int) extends Actor with Stash {

  context.setReceiveTimeout(timeoutInS seconds)

  override def receive: Receive = {
    case ReceiveTimeout =>
      shutdownActor ! ShutDown
  }

}

object MonitorActor {
  def props(shutdownActor: ActorRef, timeoutInS: Int) = Props(new MonitorActor(shutdownActor, timeoutInS))

  case object ShutDown
}

PrintActor.scala

import java.io.{BufferedWriter, File, FileWriter, PrintWriter}

import akka.actor.{Actor, ActorRef, Props, Stash}
import PrintActor.{Count, HeartBeat}

class PrintActor(name: String, interval: Int, monitorActor: ActorRef) extends Actor with Stash {

  val file = new File(name)

  override def preStart = {
    val fw = new BufferedWriter(new FileWriter(file, true))
    fw.write("[\n")
    fw.close()

    self ! Count(0)
  }

  override def receive: Receive = {
    case Count(c) =>
      context.become(withCount(c))
      unstashAll()

    case _ =>
      stash()
  }

  def withCount(c: Int): Receive = {
    case s: String =>
      val fw = new BufferedWriter(new FileWriter(file, true))
      fw.write(s)
      fw.write(",\n")
      fw.close()

      if (c == interval) {
        monitorActor ! HeartBeat
        context.become(withCount(0))
      } else {
        context.become(withCount(c+1))
      }
  }

}

object PrintActor {
  def props(name: String, interval: Int, monitorActor: ActorRef) = Props(new PrintActor(name, interval, monitorActor))

  case class Count(count: Int)

  case object HeartBeat
}

ShutDownActor.scala

import MonitorActor.ShutDown
import ShutDownActor.ShutDownYet
import akka.actor.{Actor, Props, Stash}

class ShutDownActor() extends Actor with Stash {

  override def receive: Receive = {
    case ShutDownYet => sender ! false
    case ShutDown => context.become(canShutDown())
  }

  def canShutDown(): Receive = {
    case ShutDownYet => sender ! true
  }

}

object ShutDownActor {
  def props = Props(new ShutDownActor())

  case object ShutDownYet
}

TraverseActor.scala

import java.io.File

import akka.actor.{Actor, ActorRef, PoisonPill, Props, ReceiveTimeout}
import io.circe.syntax._

import scala.collection.JavaConversions
import scala.concurrent.duration._
import scala.util.Try

class TraverseActor(start: File, printers: IndexedSeq[(String, ActorRef)]) extends Actor{

  val hash = start.hashCode()
  val mod = hash % printers.size
  val idx = if (mod < 0) -mod else mod
  val myPrinter = printers(idx)._2

  override def preStart = {
    self ! start
  }

  override def receive: Receive = {
    case f: File =>
      val path = f.getCanonicalPath
      val files = Option(f.list()).map(_.toIndexedSeq.map(x =>new File(path + "/" + x)))

      val directories = files.map(_.filter(_.isDirectory))

      directories.foreach(ds => processDirectories(ds))

      val entities = files.map{fs =>
        fs.map{ f =>
          val path = f.getCanonicalPath
          val owner = Try(java.nio.file.Files.getOwner(f.toPath).toString).toOption.getOrElse("")
          val permissions = Try(java.nio.file.Files.getPosixFilePermissions(f.toPath).toString).toOption.getOrElse("")
          val attributes = Try(java.nio.file.Files.readAttributes(f.toPath, "lastModifiedTime,creationTime,lastAccessTime"))
          val lastModifiedTime = attributes.flatMap(a => Try(a.get("lastModifiedTime").toString)).toOption.getOrElse("")
          val creationTime = attributes.flatMap(a => Try(a.get("creationTime").toString)).toOption.getOrElse("")
          val lastAccessTime = attributes.flatMap(a => Try(a.get("lastAccessTime").toString)).toOption.getOrElse("")

          if (f.isDirectory) FileEntity(path, owner, permissions, lastModifiedTime, creationTime, lastAccessTime)
          else DirectoryEntity(path, owner, permissions, lastModifiedTime, creationTime, lastAccessTime)
        }
      }

      directories match {
        case Some(seq) =>
          seq match {
            case x+:xs =>
            case IndexedSeq() => self ! PoisonPill
          }
        case None => self ! PoisonPill
      }

      entities.foreach(e => myPrinter ! Contents(f.getCanonicalPath, e).asJson.toString)
  }

  def processDirectories(directories: IndexedSeq[File]): Unit = {
    def inner(fs: IndexedSeq[File]): Unit = {
      fs match {
        case x +: xs =>
          context.actorOf(TraverseActor.props(x, printers))
          processDirectories(xs)
        case IndexedSeq() =>
      }

    }

    directories match {
      case x +: xs =>
        self ! x
        inner(xs)
      case IndexedSeq() =>
    }
  }

}

object TraverseActor {
  def props(start: File, printers: IndexedSeq[(String, ActorRef)]) = Props(new TraverseActor(start, printers))
}

Я тестировал только на небольшом примере, поэтому возможно, что эта программа столкнется с проблемами при выполнении вашей работы,Если это произойдет, не стесняйтесь задавать вопросы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...