Redis получает непредсказуемый результат команды во время выполнения - PullRequest
0 голосов
/ 13 июня 2018

У меня есть небольшая программа, которая устанавливает соединение через веб-сокет с криптообменом, получает данные и сохраняет их с помощью команды set Redis.

Код

//get Redis connection
    RedisAsyncCommands<String, String> redis = TRedis.getRedis();
    String symbol = "AGIETH";
    Session session = null;
    try {
        //Open websocket connection. 
        session = (new 
  BinanceApi()).websocketTrades(BinanceSymbol.valueOf(symbol), new BinanceWebSocketAdapterAggTrades() {
            @Override
            public void onMessage(BinanceEventAggTrade message) {
                double closeOrderBuy = 0;
                double closeOrderSell = 0;
                //check if we  saved order information before and if yes get data from Redis
                try {
                    if(redis.get(symbol+"Buy").get()!=null )
                    {
                        closeOrderBuy = Double.valueOf(redis.get(symbol+"Buy").get());
                    }
                    if( redis.get(symbol+"Sell").get()!=null)
                    {
                        closeOrderSell = Double.valueOf(redis.get(symbol+"Sell").get());
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                // get current value from exchange
                double currentCloseOrder = message.getPrice().multiply(message.getQuantity()).doubleValue();

                // rewrite data in Redis
                if(message.isMaker()) {
                    closeOrderBuy = currentCloseOrder + closeOrderBuy;
                    redis.set(symbol + "Buy",String.valueOf(closeOrderBuy));
                }
                else {
                    closeOrderSell = currentCloseOrder + closeOrderSell;
                    redis.set(symbol + "Sell",String.valueOf(closeOrderSell));
                }
            }
        });
    } catch (BinanceApiException e) {
        e.printStackTrace();
    }
    try {
        Thread.sleep(10000);
        session.close();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
//check what we write
    try {
        System.out.println(symbol + redis.get(symbol + "Buy").get() + "  " + redis.get(symbol + "Sell").get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }

Это часть консольного вывода для пары AGIETH:

18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandEncoder - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]  
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] Received: 5 bytes, 1 commands in the stack  
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decode AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]  
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decoded AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], empty stack: true  
***AGIETH 0.045027  null***

Выход для пары AGIBTC

13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandEncoder - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] Received: 25 bytes, 1 commands in the stack
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decode AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decoded AsyncCommand [type=GET, output=ValueOutput [output=0.5475444299999999, error='null'], commandType=io.lettuce.core.protocol.Command], empty stack: true  
**AGIBTC 0.20769342999999998  0.5475444299999999**

Я получаю Null для некоторых пар, но обмен обеспечивает этоИнформация.Я не понимаю, если это проблема Redis, или логика моей программы неверна?

1 Ответ

0 голосов
/ 13 июня 2018

Я вижу две проблемы с вашим кодом:

  1. Вы используете Redis async API, но не ожидаете должным образом завершения операции set ().RedisAsyncCommands.set возвращает RedisFuture.Перед вызовом get () вы должны убедиться, что будущее завершено.
  2. Вы ожидаете торговое событие от BinanceApi в течение 10 секунд.Но у некоторых валютных пар может быть несколько сделок, поэтому 10 секунд просто недостаточно.

Я немного изменил ваш код, добавив надлежащие условия ожидания, и все, кажется, работает нормально:

//get Redis connection
    RedisClient client = RedisClient.create("redis://localhost");
    StatefulRedisConnection<String, String> connection = client.connect();
    RedisAsyncCommands<String, String> redis = connection.async();
    String symbol = "AGIETH";
    Session session = null;
    CompletableFuture<String> cfBuy = new CompletableFuture<>();
    CompletableFuture<String> cfSell = new CompletableFuture<>();
    try {
        //Open websocket connection. 
        session = (new
            BinanceApi()).websocketTrades(BinanceSymbol.valueOf(symbol), new BinanceWebSocketAdapterAggTrades() {
            @Override
            public void onMessage(BinanceEventAggTrade message) {
                double closeOrderBuy = 0;
                double closeOrderSell = 0;
                //check if we  saved order information before and if yes get data from Redis
                try {
                    if(redis.get(symbol+"Buy").get()!=null )
                    {
                        closeOrderBuy = Double.valueOf(redis.get(symbol+"Buy").get());
                    }
                    if( redis.get(symbol+"Sell").get()!=null)
                    {
                        closeOrderSell = Double.valueOf(redis.get(symbol+"Sell").get());
                    }

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                // get current value from exchange
                double currentCloseOrder = message.getPrice().multiply(message.getQuantity()).doubleValue();

                // rewrite data in Redis
                if(message.isMaker()) {
                    closeOrderBuy = currentCloseOrder + closeOrderBuy;
                    redis.set(symbol + "Buy",String.valueOf(closeOrderBuy)).thenAccept(cfBuy::complete);
                }
                else {
                    closeOrderSell = currentCloseOrder + closeOrderSell;
                    redis.set(symbol + "Sell",String.valueOf(closeOrderSell)).thenAccept(cfSell::complete);
                }
            }
        });
    } catch (BinanceApiException e) {
        e.printStackTrace();
    }
    try {
        System.out.println("Futures completed: " + cfBuy.get() + " " + cfSell.get());
        session.close();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    //check what we write
    try {
        System.out.println(symbol + redis.get(symbol + "Buy").get() + "  " + redis.get(symbol + "Sell").get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }    

Вывод:

Futures completed: OK OK
AGIETH0.06922798  0.08610368
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...