Nodejs & Typescript - контроллеры сокетов и pubsub IHandyRedis подписываются на пользовательские события, модель не работает - PullRequest
0 голосов
/ 13 июля 2020

Я пытаюсь создать довольно простую c socket-io и систему подписки и прослушивания Redis, однако кажется, что каналы Redis перепутались или что-то в этом роде. Я все перепробовал, и ничего не работает.

У меня user1, и они подписаны на свои события и user2 события. Как видно ниже, это клиент.

//user1 - socket setup
var socket = io('${heatmap.socket_host}/${user.id}', {
        'transports': ['websocket'],
        'autoConnect': false,
        'query': {'access': heatmap.accessToken}
      });
    


//user2 - socket setup
var socket = io('${heatmap.socket_host}/${user2.id}', {
        'transports': ['websocket'],
        'autoConnect': false, // optional
        'query': {'access': heatmap.accessToken},
      });
      
      
//user1 client

user.websocket.connect();

  //SUBSCRIBING TO THEMSELVES
  user.websocket.on('connect', (_) {
    print('[connected to me]');
    user.websocket.on('LocationChange', (data) => print('[me] - LocationChange: $data'));
  });

  var devices = await heatmap.devices.find();
  var device = devices.first;

  //POSTING EVENTS TO THEMSELVES
  Timer.periodic(Duration(seconds: 10), (timer) async {
    await heatmap.events.postLocation(55, 45, device.id);
  });

  var people = await heatmap.requests.getGrantedRequests();
  people.sent.first.device.user.websocket.connect();

  //SUBSCRIBING TO USER2
  people.sent.first.device.user.websocket.on('connect', (_) {
    print('connected to ${people.sent.first.device.user.name}');
    people.sent.first.device.user.websocket.on('LocationChange', (data) => print('[other] - LocationChange: $data'));
  });
  
  
//user2 client (not subscribed to any events, but is posting events which should be received as `[other] - LocationChange: data`)
Timer.periodic(Duration(seconds: 10), (timer) async {
    await heatmap.events.postLocation(51.34, -0.123, '02355fab-9afa-4e26-b7ea-    fe7af867a751');
    print('Posted');
    });

В nodejs я использую сокеты-контроллеры для сокетов и iHandyRedis для Redis. Они настроены следующим образом.

  const redis = createHandyClient(redisPort, redisHost);
  const subscriber = createHandyClient(redisPort, redisHost);

  Container.set("redis", redis);
  Container.set("subscriber", subscriber);
  
  const socketServer = io(webServer);
  useSocketServer(socketServer, { controllers: [EventsSocketController] });

У меня есть конечная точка, где пользователь отправляет событие на POST /v1/events, а оно публикует на redisClient, как показано ниже

    await this.redisClient.publish(`${user.id}:events`, JSON.stringify(event));

Ниже приводится EventSocketController, где я управляю подключением, отключением и подпиской на канал Redis

@SocketController('/:id')
export class EventsSocketController {
  @Inject("redis")
  private redisClient!: IHandyRedis;

  @Inject("subscriber")
  private subscriberClient!: IHandyRedis;

  @InjectRepository(Session)
  private sessionRepository!: Repository<Session>;

  @InjectRepository(ShareRequest)
  private requestRepository!: Repository<ShareRequest>;

  @InjectRepository(User)
  private userRepository!: Repository<User>;

  @OnConnect()
  async connection(
    @ConnectedSocket() socket: Socket,
    @SocketQueryParam("access") access: string,
    @NspParam('id') userId: string
  ): Promise<void> {

    console.log(socket.id);

    try {
      const session = await this.sessionRepository.findOneOrFail(
        { access },
        { relations: ["user"] }
      );

      this.subscriberClient.redis.on("message", (_, message) => {
        this.redisClient.redis.hgetall(message, (err, res) => {
          socket.emit(JSON.parse(message)['type'], message);
        })
      });

      if (userId && userId != session.user.id) {
        const granted = await this.requestRepository
          .createQueryBuilder("request")
          .innerJoin("request.from", "from", "from.id = :id", { id: session.user.id })
          .where("request.granted = :granted", { granted: true })
          .getMany();
        if (granted.length === 0) throw new NotFoundError();

        const user = await this.userRepository
          .createQueryBuilder("user")
          .where("user.id = :userId", { userId })
          .getOne();
        if (!user) throw new NotFoundError();

        await this.subscriberClient.subscribe(`${user.id}:events`);

      }

      await this.subscriberClient.subscribe(`${session.user.id}:events`);

    } catch (error) {
      socket.emit("BAD_TOKEN");
      socket.disconnect();
    }
  }

  @OnDisconnect()
  async disconnect(@SocketQueryParam("userId") userId: string): Promise<void> {
    await this.subscriberClient.unsubscribe(`${userId}:events`);
  }
}

ПРОБЛЕМА В КЛИЕНТЕ Я ПОЛУЧАЮ

[me] - LocationChange: {"type":"LocationChange","data":{"latitude":"51.34","longitude":"-0.123","timestamp":"2020-07-13 16:39:45.757820"}}

[other] - LocationChange: {"type":"LocationChange","data":{"latitude":"51.34","longitude":"-0.123","timestamp":"2020-07-13 16:39:45.757820"}}

[me] - LocationChange: {"type":"LocationChange","data":{"latitude":"55.0","longitude":"45.0","timestamp":"2020-07-13 16:39:49.282664"}}

[other] - LocationChange: {"type":"LocationChange","data":{"latitude":"55.0","longitude":"45.0","timestamp":"2020-07-13 16:39:49.282664"}}

Заранее спасибо!

...