Python ИСПОЛЬЗУЕТ публикацию с подпиской Redis для передачи больших данных гранулярности - PullRequest
0 голосов
/ 16 марта 2020

Среда выполнения.

macOS
redis-5.05

python-3.7.6
redis-3.4.1

redis информация о конфигурации.

127.0.0.1:6379> info memory
# Memory
used_memory:1052688
used_memory_human:1.00M
used_memory_rss:2084864
used_memory_rss_human:1.99M
used_memory_peak:135268640
used_memory_peak_human:129.00M
used_memory_peak_perc:0.78%
used_memory_overhead:1037646
used_memory_startup:987952
used_memory_dataset:15042
used_memory_dataset_perc:23.24%
allocator_allocated:1007568
allocator_active:2046976
allocator_resident:2046976
total_system_memory:8589934592
total_system_memory_human:8.00G
used_memory_lua:37888
used_memory_lua_human:37.00K
used_memory_scripts:0
used_memory_scripts_human:0B
number_of_cached_scripts:0
maxmemory:0
maxmemory_human:0B
maxmemory_policy:noeviction
allocator_frag_ratio:2.03
allocator_frag_bytes:1039408
allocator_rss_ratio:1.00
allocator_rss_bytes:0
rss_overhead_ratio:1.02
rss_overhead_bytes:37888
mem_fragmentation_ratio:2.07
mem_fragmentation_bytes:1077296
mem_not_counted_for_evict:0
mem_replication_backlog:0
mem_clients_slaves:0
mem_clients_normal:49694
mem_aof_buffer:0
mem_allocator:libc
active_defrag_running:0
lazyfree_pending_objects:0

Код программы.

# - sub.py -
import redis
import numpy as np
rc = redis.StrictRedis()
ps = rc.pubsub()


def my_handler(data):
    arr_data = np.frombuffer(data["data"], dtype='u4')
    data.update({"data": len(arr_data)})
    print(data)


ps.subscribe(**{"demo": my_handler})
ps.run_in_thread()
# -- pub.py --
import redis
import time
import numpy

# data = numpy.arange(1024**2, dtype="u4")
data = numpy.arange(1024**2*16, dtype="u4")
rc = redis.StrictRedis()
for i in range(100):
    time.sleep(0.5)
    print(rc.publish("demo", data.tobytes()))

Когда я использовал его для передачи 1 МБ данных, это было нормально, и когда размер был 64 МБ, он имел следующую информацию и информацию об исключениях:

# - sub.py -
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/client.py", line 3667, in run
    timeout=sleep_time)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/client.py", line 3565, in get_message
    response = self.parse_response(block=False, timeout=timeout)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/client.py", line 3451, in parse_response
    if not block and not conn.can_read(timeout=timeout):
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/connection.py", line 729, in can_read
    return self._parser.can_read(timeout)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/connection.py", line 313, in can_read
    return self._buffer and self._buffer.can_read(timeout)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/connection.py", line 223, in can_read
    raise_on_timeout=False)
  File "/Users/pppig/anaconda/anaconda3/envs/package_test/lib/python3.7/site-packages/redis/connection.py", line 193, in _read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
# - pub.py -
1
0
0
...

Нужно ли добавлять некоторые параметры во время создания экземпляра, чтобы решить это? С нетерпением жду ответа.

...