Как обработать POST-запрос с помощью Kafka, Alpakka Kafka, Play Framework и Websocket? - PullRequest
0 голосов
/ 16 апреля 2020

Допустим, у меня есть две темы кафки, request_topic для моих запросов Post и response_topic для моих ответов.

Это модели:

case class Request(requestId: String, body: String)
case class Response(responseId: String, body: String, requestId: String)

Это моя обработчик сокета

def socket = WebSocket.accept[String, String] { req =>
  val requestId = ??? // Generate a unique requestId

  val in: Sink[String, Future[Done]] = Sink.foreach[String]{ msg =>
    val record = new ProducerRecord[String, Request]("request_topic", "key", Request(requestId, msg))
    val producer: KafkaProducer[String, Request] = ???
    Future { producer.send(record).get }
  }

  // Once produced, some stream processing apps will manage to process request and publish the reponse to response_topic
  // The Request and Response object are linked by the requestId field.

  val consumerSettings = ???
  val out: Source[ConsumerRecord[String, Response], _] = Consumer
    .plainSource(consumerSettings, Subscriptions.topics("response_topic"))
    .filter(cr => cr.value.requestId == requestId)
    .map(cr => someResponseString(cr.value))

  Flow.formSinkAndSource(in, out)
}

def someResponseString(res: Response): String = ???

По сути, для каждого входящего сообщения я публикую sh объект запроса в Kafka, затем запрос обрабатывается некоторым приложением потоковой обработки (здесь не показано), и, надеюсь, ответ публикуется вернуться к Kafka.

У меня есть некоторые опасения:

1 - Alpakka Kafka Connector создаст новый экземпляр коннектора для каждого входящего сообщения или будет использовать один и тот же экземпляр до тех пор, пока Play

2 - Это хорошая идея, чтобы отфильтровать ответ на основе отдельного requestId, или я должен отправить весь поток обратно каждому клиенту, и позволить им отфильтровать ответ на основе requestId, который их интересует .

3 - Я не прав во всем? (Я настоящий новичок ie в Websocket)

Заранее спасибо.

1 Ответ

1 голос
/ 27 апреля 2020

1) Зависит от того, как вы его настроили. Например, в теле in: Sink вы создаете новый KafkaProducer для каждого сообщения. Вместо этого у вас должен быть один производитель для всего приложения.

Я не уверен, как работают модели потоков Akka / Play, но большинство веб-серверов запускают новый поток для каждого входящего соединения, вплоть до фиксированного количества потоков в пуле потоков.

2) Я хотел бы подумать, что фильтрация как можно скорее будет предпочтительнее, а также делать как можно больше на стороне сервера. Это экономит пропускную способность обратно к клиенту.

Кроме того, если вы хотите только отправить sh данные с Kafka на веб-сервере клиенту в одном направлении, вы, вероятно, захотите SSE, а не Websocket

...