Допустим, у меня есть две темы кафки, request_topic
для моих запросов Post и response_topic
для моих ответов.
Это модели:
case class Request(requestId: String, body: String)
case class Response(responseId: String, body: String, requestId: String)
Это моя обработчик сокета
def socket = WebSocket.accept[String, String] { req =>
val requestId = ??? // Generate a unique requestId
val in: Sink[String, Future[Done]] = Sink.foreach[String]{ msg =>
val record = new ProducerRecord[String, Request]("request_topic", "key", Request(requestId, msg))
val producer: KafkaProducer[String, Request] = ???
Future { producer.send(record).get }
}
// Once produced, some stream processing apps will manage to process request and publish the reponse to response_topic
// The Request and Response object are linked by the requestId field.
val consumerSettings = ???
val out: Source[ConsumerRecord[String, Response], _] = Consumer
.plainSource(consumerSettings, Subscriptions.topics("response_topic"))
.filter(cr => cr.value.requestId == requestId)
.map(cr => someResponseString(cr.value))
Flow.formSinkAndSource(in, out)
}
def someResponseString(res: Response): String = ???
По сути, для каждого входящего сообщения я публикую sh объект запроса в Kafka, затем запрос обрабатывается некоторым приложением потоковой обработки (здесь не показано), и, надеюсь, ответ публикуется вернуться к Kafka.
У меня есть некоторые опасения:
1 - Alpakka Kafka Connector создаст новый экземпляр коннектора для каждого входящего сообщения или будет использовать один и тот же экземпляр до тех пор, пока Play
2 - Это хорошая идея, чтобы отфильтровать ответ на основе отдельного requestId, или я должен отправить весь поток обратно каждому клиенту, и позволить им отфильтровать ответ на основе requestId, который их интересует .
3 - Я не прав во всем? (Я настоящий новичок ie в Websocket)
Заранее спасибо.