Все, что вам нужно сделать, это изменить источник данных.
Извлечение данных из файла CSV:
val dataSource = FileIO.fromPath(Paths.get("file.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true)
.map(_.utf8String))
Выборка данных из SQS (Alpakka):
val dataSource = SqsSource(queue, sqsSourceSettings).take(100).map(_.getBody)
Выборка данных из таблицы с помощью Slick (Alpakka):
val dataSource = Slick.source(sql"SELECT NAME FROM USERS".as[String])
В основном вам нужно понять три вещи:
- Источник: один выход
- Поток: один вход, один выход
- Раковина: один вход.
Зная это, вы можете строить линейные конвейеры, как:
source.via(flow1).via(flow2).runWith(sink)
Таким образом, вы можете легко «подключить» источники к существующему конвейеру и запускать их с любым желаемым приемником:
val pipeline = flow1.via(flow2)
val fileSource = FileIO.fromPath(Paths.get("file.csv"))
.via(Framing.delimiter(ByteString("\n"), 256, true)
.map(_.utf8String))
.via(pipeline)
.runWith(sink)
val sqsSource = Slick
.source(sql"SELECT NAME FROM USERS".as[String])
.via(pipeline)
.runWith(sink)
val slickFlow = SqsSource(queue, sqsSourceSettings).take(100)
.map(_.getBody)
.via(pipeline)
.runWith(sink)
Редактировать: Ну, кроме стратегии actorRef, вы также можете использовать Source.queue и создавать свои сообщения, вызывая queue.offer:
def source = Source
.queue(Int.MaxValue, OverflowStrategy.backpressure)
.map { name: String => s"hello, $name" }
.toMat(BroadcastHub.sink[String])(Keep.both)
.run()
def wsHandler(s: Source[String, NotUsed]): Flow[Message, Message, NotUsed] = Flow[Message]
.mapConcat(_ => Nil)
.merge(s)
.map(TextMessage(_))
import scala.concurrent.duration._
val websocketRoute =
path("greeter" / Segment) { name =>
val (queue, s) = source
Source
.tick(
initialDelay = 1 second,
interval = 1 second,
tick = None
)
.map { _ =>
queue.offer(name)
}
.runWith(Sink.ignore)
handleWebSocketMessages(wsHandler(s))
}
Внешние ссылки: