Есть ли способ использовать groupBy в потоке и отправить каждый подпоток в другой файл? - PullRequest
0 голосов
/ 12 сентября 2018

Например, если я анализирую журнал, который начинается с имени сервера, и я хочу разделить его в файле для каждого сервера, есть ли способ сделать это, не зная, сколько существует серверов?

FileIO.fromPath(Paths.get("in.log"))
   .via(Framing.delimiter(ByteString("\n".getBytes), maximumFrameLength = 4000)).map(_.utf8String)
   .map(_.span(_ == ' '))
   .groupBy(100, _._1) 

Это может привести к подпотоку (имя файла, журнал), но я не знаю, возможно ли подключить каждый подпоток к отдельному приемнику.

1 Ответ

0 голосов
/ 13 сентября 2018

Вам нужно для дальнейшей обработки строк журнала?Если нет, вы можете просто использовать пользовательский приемник.

def writeLineByServerName(line: String): Unit = {
  val name = getServerNameFromLine(line)
  // either create a new output stream or get an existing one from
  // a pool. you may need to manage your resources to limit open
  // buffers
  val outputStream = getOutputStream(name)
  outputStream.write(line)
}

FileIO.fromPath(Paths.get("in.log"))
   .via(Framing.delimiter(ByteString("\n".getBytes), maximumFrameLength = 4000)).map(_.utf8String)
   .map(_.span(_ == ' '))
   .to(Sink.foreach[String](writeLineByServerName))

В противном случае вы можете прочитать файл дважды.При первом чтении выясняется, сколько серверов существует.

...