Я закончил тем, что использовал актеров Акки.
Я сделал предположения относительно желаемого результата, чтобы программа была простой и быстрой. Я сделал предположение, что вывод - это JSON, иерархия не сохраняется, и что допустимы несколько файлов. Если вам не нравится JSON, вы можете заменить его чем-то другим, но два других предположения важны для сохранения текущей скорости и простоты программы.
Есть некоторые параметры командной строки, которые вы можете установить. Если вы не установите их, будут использоваться значения по умолчанию. Значения по умолчанию содержатся в Main.scala.
Параметры командной строки следующие:
(0) корневой каталог, с которого вы начинаете;(нет по умолчанию)
(1) интервал времени ожидания (в секундах) для всех таймаутов в этой программе;(по умолчанию 60)
(2) количество используемых принтером актеров;это будет количество созданных файлов журнала;(по умолчанию 50)
(3) интервал между тиками, используемый для актера монитора;(по умолчанию 500)
Для тайм-аута имейте в виду, что это значение временного интервала ожидания при завершении программы. Поэтому, если вы запускаете небольшую работу и удивляетесь, почему ее выполнение занимает минуту, то это потому, что она ожидает истечения интервала времени ожидания перед закрытием программы.
Поскольку вы выполняете такую большую работу,Возможно, что тайм-аут по умолчанию 60 слишком мал. Если вы получаете исключения, жалующиеся на тайм-аут, увеличьте значение тайм-аута.
Обратите внимание, что если установлен слишком большой интервал между тиками, есть вероятность, что ваша программа преждевременно закроется.
Для запускапросто запустите sbt в папке проекта и введите
runMain Main <canonical path of root directory>
Я не могу понять, как получить группу файла в Java. Вам нужно исследовать это и добавить соответствующий код в Entity.scala и TraverseActor.scala.
Кроме того, f.list () в TraverseActor.scala иногда возвращался как ноль, поэтому я обернул егов варианте. Вам нужно будет отладить эту проблему, чтобы убедиться, что у вас нет ошибок в некоторых файлах.
Теперь вот содержимое всех файлов.
build.sbt
name := "stackoverflow20191110"
version := "0.1"
scalaVersion := "2.12.1"
libraryDependencies ++= Seq(
"io.circe" %% "circe-core",
"io.circe" %% "circe-generic",
"io.circe" %% "circe-parser"
).map(_ % "0.12.2")
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.16"
Entity.scala
import io.circe.Encoder
import io.circe.generic.semiauto._
sealed trait Entity {
def path: String
def owner: String
def permissions: String
def lastModifiedTime: String
def creationTime: String
def lastAccessTime: String
def hashCode: Int
}
object Entity {
implicit val entityEncoder: Encoder[Entity] = deriveEncoder
}
case class FileEntity(path: String, owner: String, permissions: String, lastModifiedTime: String, creationTime: String, lastAccessTime: String) extends Entity
object fileentityEncoder {
implicit val fileentityEncoder: Encoder[FileEntity] = deriveEncoder
}
case class DirectoryEntity(path: String, owner: String, permissions: String, lastModifiedTime: String, creationTime: String, lastAccessTime: String) extends Entity
object DirectoryEntity {
implicit val directoryentityEncoder: Encoder[DirectoryEntity] = deriveEncoder
}
case class Contents(path: String, files: IndexedSeq[Entity])
object Contents {
implicit val contentsEncoder: Encoder[Contents] = deriveEncoder
}
Main.scala
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import java.io.{BufferedWriter, File, FileWriter}
import ShutDownActor.ShutDownYet
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Try
object Main {
val defaultNumPrinters = 50
val defaultMonitorTickInterval = 500
val defaultTimeoutInS = 60
def main(args: Array[String]): Unit = {
val timeoutInS = Try(args(1).toInt).toOption.getOrElse(defaultTimeoutInS)
val system = ActorSystem("SearchHierarchy")
val shutdown = system.actorOf(ShutDownActor.props)
val monitor = system.actorOf(MonitorActor.props(shutdown, timeoutInS))
val refs = (0 until Try(args(2).toInt).toOption.getOrElse(defaultNumPrinters)).map{x =>
val name = "logfile" + x
(name, system.actorOf(PrintActor.props(name, Try(args(3).toInt).toOption.getOrElse(defaultMonitorTickInterval), monitor)))
}
val root = system.actorOf(TraverseActor.props(new File(args(0)), refs))
implicit val askTimeout = Timeout(timeoutInS seconds)
var isTimedOut = false
while(!isTimedOut){
Thread.sleep(30000)
val fut = (shutdown ? ShutDownYet).mapTo[Boolean]
isTimedOut = Await.result(fut, timeoutInS seconds)
}
refs.foreach{ x =>
val fw = new BufferedWriter(new FileWriter(new File(x._1), true))
fw.write("{}\n]")
fw.close()
}
system.terminate
}
}
MonitorActor.scala
import MonitorActor.ShutDown
import akka.actor.{Actor, ActorRef, Props, ReceiveTimeout, Stash}
import io.circe.syntax._
import scala.concurrent.duration._
class MonitorActor(shutdownActor: ActorRef, timeoutInS: Int) extends Actor with Stash {
context.setReceiveTimeout(timeoutInS seconds)
override def receive: Receive = {
case ReceiveTimeout =>
shutdownActor ! ShutDown
}
}
object MonitorActor {
def props(shutdownActor: ActorRef, timeoutInS: Int) = Props(new MonitorActor(shutdownActor, timeoutInS))
case object ShutDown
}
PrintActor.scala
import java.io.{BufferedWriter, File, FileWriter, PrintWriter}
import akka.actor.{Actor, ActorRef, Props, Stash}
import PrintActor.{Count, HeartBeat}
class PrintActor(name: String, interval: Int, monitorActor: ActorRef) extends Actor with Stash {
val file = new File(name)
override def preStart = {
val fw = new BufferedWriter(new FileWriter(file, true))
fw.write("[\n")
fw.close()
self ! Count(0)
}
override def receive: Receive = {
case Count(c) =>
context.become(withCount(c))
unstashAll()
case _ =>
stash()
}
def withCount(c: Int): Receive = {
case s: String =>
val fw = new BufferedWriter(new FileWriter(file, true))
fw.write(s)
fw.write(",\n")
fw.close()
if (c == interval) {
monitorActor ! HeartBeat
context.become(withCount(0))
} else {
context.become(withCount(c+1))
}
}
}
object PrintActor {
def props(name: String, interval: Int, monitorActor: ActorRef) = Props(new PrintActor(name, interval, monitorActor))
case class Count(count: Int)
case object HeartBeat
}
ShutDownActor.scala
import MonitorActor.ShutDown
import ShutDownActor.ShutDownYet
import akka.actor.{Actor, Props, Stash}
class ShutDownActor() extends Actor with Stash {
override def receive: Receive = {
case ShutDownYet => sender ! false
case ShutDown => context.become(canShutDown())
}
def canShutDown(): Receive = {
case ShutDownYet => sender ! true
}
}
object ShutDownActor {
def props = Props(new ShutDownActor())
case object ShutDownYet
}
TraverseActor.scala
import java.io.File
import akka.actor.{Actor, ActorRef, PoisonPill, Props, ReceiveTimeout}
import io.circe.syntax._
import scala.collection.JavaConversions
import scala.concurrent.duration._
import scala.util.Try
class TraverseActor(start: File, printers: IndexedSeq[(String, ActorRef)]) extends Actor{
val hash = start.hashCode()
val mod = hash % printers.size
val idx = if (mod < 0) -mod else mod
val myPrinter = printers(idx)._2
override def preStart = {
self ! start
}
override def receive: Receive = {
case f: File =>
val path = f.getCanonicalPath
val files = Option(f.list()).map(_.toIndexedSeq.map(x =>new File(path + "/" + x)))
val directories = files.map(_.filter(_.isDirectory))
directories.foreach(ds => processDirectories(ds))
val entities = files.map{fs =>
fs.map{ f =>
val path = f.getCanonicalPath
val owner = Try(java.nio.file.Files.getOwner(f.toPath).toString).toOption.getOrElse("")
val permissions = Try(java.nio.file.Files.getPosixFilePermissions(f.toPath).toString).toOption.getOrElse("")
val attributes = Try(java.nio.file.Files.readAttributes(f.toPath, "lastModifiedTime,creationTime,lastAccessTime"))
val lastModifiedTime = attributes.flatMap(a => Try(a.get("lastModifiedTime").toString)).toOption.getOrElse("")
val creationTime = attributes.flatMap(a => Try(a.get("creationTime").toString)).toOption.getOrElse("")
val lastAccessTime = attributes.flatMap(a => Try(a.get("lastAccessTime").toString)).toOption.getOrElse("")
if (f.isDirectory) FileEntity(path, owner, permissions, lastModifiedTime, creationTime, lastAccessTime)
else DirectoryEntity(path, owner, permissions, lastModifiedTime, creationTime, lastAccessTime)
}
}
directories match {
case Some(seq) =>
seq match {
case x+:xs =>
case IndexedSeq() => self ! PoisonPill
}
case None => self ! PoisonPill
}
entities.foreach(e => myPrinter ! Contents(f.getCanonicalPath, e).asJson.toString)
}
def processDirectories(directories: IndexedSeq[File]): Unit = {
def inner(fs: IndexedSeq[File]): Unit = {
fs match {
case x +: xs =>
context.actorOf(TraverseActor.props(x, printers))
processDirectories(xs)
case IndexedSeq() =>
}
}
directories match {
case x +: xs =>
self ! x
inner(xs)
case IndexedSeq() =>
}
}
}
object TraverseActor {
def props(start: File, printers: IndexedSeq[(String, ActorRef)]) = Props(new TraverseActor(start, printers))
}
Я тестировал только на небольшом примере, поэтому возможно, что эта программа столкнется с проблемами при выполнении вашей работы,Если это произойдет, не стесняйтесь задавать вопросы.