Заблокированный поток во время ожидания последнего элемента - PullRequest
0 голосов
/ 14 января 2020

Я хочу соединить два приложения через rsocket. Один написан в GO, а второй в Kotlin. Я хочу реализовать соединение, когда клиент отправляет поток данных, а сервер отправляет ответ подтверждения.

Проблема заключается в ожидании всех элементов, если сервер не BlockOnLast (ctx), весь поток читается, но ответ отправляется до того, как все записи прибывают. Если BlockOnLast (ctx) добавлен, сервер (GoLang) застрял.

Я также написал клиент в Kotlin, и в этом случае вся связь работает отлично.

Может ли кто-нибудь помочь?

GO Сервер:

package main

import (
"context"
"github.com/golang/protobuf/proto"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
"rsocket-go-rpc-test/proto"
)

func main() {
addr := "tcp://127.0.0.1:8081"
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := rsocket.Receive().
    Fragment(1024).
    Resume().
    Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
        return rsocket.NewAbstractSocket(
            rsocket.RequestChannel(func(payloads rx.Publisher) flux.Flux {
                println("START")

                payloads.(flux.Flux).
                    DoOnNext(func(input payload.Payload) {
                        chunk := &pl_dwojciechowski_proto.Chunk{}
                        proto.Unmarshal(input.Data(), chunk)
                        println(string(chunk.Content))
                    }).BlockLast(ctx)

                return flux.Create(func(i context.Context, sink flux.Sink) {
                    status, _ := proto.Marshal(&pl_dwojciechowski_proto.UploadStatus{
                        Message: "OK",
                        Code:    0,
                    })

                    sink.Next(payload.New(status, make([]byte, 1)))
                    sink.Complete()
                    println("SENT")
                })
            }),
        ), nil
    }).
    Transport(addr).
    Serve(ctx)
panic(err)

}

Kotlin Клиент:

private fun clientCall() {
val rSocket = RSocketFactory.connect().transport(TcpClientTransport.create(8081)).start().block()
val client = FileServiceClient(rSocket)

val requests: Flux<Chunk> = Flux.range(1, 10)
    .map { i: Int -> "sending -> $i" }
    .map<Chunk> {
        Chunk.newBuilder()
            .setContent(ByteString.copyFrom(it.toByteArray())).build()
    }

val response = client.send(requests).block() ?: throw Exception("")
rSocket.dispose()
System.out.println(response.message)

}

И эквивалент для GO записано в Kotlin:

    val serviceServer = FileServiceServer(DefaultService(), Optional.empty(), Optional.empty())
val closeableChannel = RSocketFactory.receive()
    .acceptor { setup: ConnectionSetupPayload?, sendingSocket: RSocket? ->
        Mono.just(
            RequestHandlingRSocket(serviceServer)
        )
    }
    .transport(TcpServerTransport.create(8081))
    .start()
    .block()
    closeableChannel.onClose().block()

class DefaultService : FileService {
override fun send(messages: Publisher<Service.Chunk>?, metadata: ByteBuf?): Mono<Service.UploadStatus> {
    return Flux.from(messages)
        .windowTimeout(10, Duration.ofSeconds(500))
        .flatMap(Function.identity())
        .doOnNext { println(it.content.toStringUtf8()) }
        .then(Mono.just(Service.UploadStatus.newBuilder().setCode(Service.UploadStatusCode.Ok).setMessage("test").build()))
}
}

Вывод на сервер:

START
sending -> 1

1 Ответ

0 голосов
/ 16 января 2020

Решение ниже:

package main
import (
   "context"
   "github.com/golang/protobuf/proto"
   "github.com/rsocket/rsocket-go"
   "github.com/rsocket/rsocket-go/payload"
   "github.com/rsocket/rsocket-go/rx"
   "github.com/rsocket/rsocket-go/rx/flux"
   "rsocket-go-rpc-test/proto"
)
type TestService struct {
   totals int
pl_dwojciechowski_proto.FileService
}
var statusOK = &pl_dwojciechowski_proto.UploadStatus{
   Message: "code",
Code:    pl_dwojciechowski_proto.UploadStatusCode_Ok,
}
var statusErr = &pl_dwojciechowski_proto.UploadStatus{
   Message: "code",
Code:    pl_dwojciechowski_proto.UploadStatusCode_Failed,
}
func main() {
   addr := "tcp://127.0.0.1:8081"
ctx, cancel := context.WithCancel(context.Background())
   defer cancel()
   err := rsocket.Receive().
      Fragment(1024).
      Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
         return rsocket.NewAbstractSocket(
            rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
               dataReceivedChan := make(chan bool, 1)
               toChan, _ := flux.Clone(msgs).
                  DoOnError(func(e error) {
                     dataReceivedChan <- false
}).
                  DoOnComplete(func() {
                     dataReceivedChan <- true
}).
                  ToChan(ctx, 1)
               fluxResponse := flux.Create(func(ctx context.Context, s flux.Sink) {
                  gluedContent := make([]byte, 1024)
                  for c := range toChan {
                     chunk := pl_dwojciechowski_proto.Chunk{}
                     _ = chunk.XXX_Unmarshal(c.Data())
                     gluedContent = append(gluedContent, chunk.Content...)
                  }
                  if <-dataReceivedChan {
                     marshal, _ := proto.Marshal(statusOK)
                     s.Next(payload.New(marshal, nil))
                     s.Complete()
                  } else {
                     marshal, _ := proto.Marshal(statusErr)
                     s.Next(payload.New(marshal, nil))
                     s.Complete()
                  }
               })
               return fluxResponse
}),
), nil
}).
      Transport(addr).
      Serve(ctx)
   panic(err)
}
...