Я работаю над потоковой передачей своей веб-камеры потребителю kakfa, чтобы провести некоторый анализ мимики и сохранить его в БД.
Тем не менее, я отмечаю значительное отставание на стороне потребителя (3 секунды).В процессе устранения этой задержки снижается качество сообщений / изображений на стороне производителя.Однако это не улучшило задержку и еще больше ухудшило качество изображения на стороне потребителя.
Ниже приведен код на Kafka Producer:
from confluent_kafka import Producer
import cv2
import time, sys
p = Producer({'bootstrap.servers': 'localhost'})
encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
camera = cv2.VideoCapture(0)
try:
while (True):
success, frame = camera.read()
frame = cv2.resize(frame, None, fx=0.3, fy=0.3)
result, buffer = cv2.imencode('.jpg', frame, encode_param)
#ret, buffer = cv2.imencode('.jpg', frame)
p.poll(0)
p.produce('newtopic', buffer.tobytes(), callback=delivery_report)
print('sent')
p.flush()
time.sleep(0.1)
except:
print("\nExiting.")
sys.exit(1)
Ниже приведен код на Kafka Consumer:
import cv2
import sys
import os
import imutils
import logging
from confluent_kafka import Consumer, KafkaError
from confluent_kafka.admin import AdminClient
def example_delete_topics(a, topics):
fs = a.delete_topics(topics, operation_timeout=30)
# Wait for operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} deleted".format(topic))
except Exception as e:
print("Failed to delete topic {}: {}".format(topic, e))
a = AdminClient({'bootstrap.servers': 'localhost'})
example_delete_topics(a, ['newtopic'])
c = Consumer({
'bootstrap.servers': 'localhost',
'group.id': 'mygroup',
'auto.offset.reset': 'latest'
})
camID = 0
c.subscribe(['newtopic'])
(H, W) = (None, None)
while True:
msg = c.poll(1.0)
if msg is None:
print ("No Message")
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print ("Received")
img_bytes = msg.value()
frame = np.array(Image.open(io.BytesIO(img_bytes)))
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = imutils.resize(frame, width=800)
if W is None or H is None:
(H, W) = frame.shape[:2]
#print (H,W)
try:
rects, landmarks = face_detect.detect_face(frame,20)
rects_tracker = []
for (i,rect) in enumerate(rects):
# Intensive calculations on sentiment analysis
draw_border(frame,(rect[0],rect[1]),(rect[0] + rect[2],rect[1]+rect[3]),(127,255,255),1,10,20)
cv2.putText(frame,final_result,(rect[0],rect[1]),cv2.FONT_HERSHEY_SIMPLEX,0.5,(127,255,255),1,cv2.LINE_AA)
except Exception as e :
logging.info('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
pass
cv2.imshow('Sample', frame)
key = cv2.waitKey(20)
if key == 27: # exit on ESC
end = time.time()
seconds = end - start
fps = frame_counter / seconds;
print ("Estimated frames per second : {0}".format(fps))
break
c.close()
Несмотря на то, что код на стороне потребителя тяжел, много шагов.На прямой веб-камере без Kafka для запуска каждого кадра требуется 150 мсек.Но с Кафкой, это занимает около 3 секунд.
Может ли кто-нибудь помочь с тем, как я могу значительно уменьшить его, чтобы получить почти в реальном времени