Confluent Kafka: Lag in Kafka Consumer при потоковой передаче веб-камеры - PullRequest
0 голосов
/ 17 декабря 2018

Я работаю над потоковой передачей своей веб-камеры потребителю kakfa, чтобы провести некоторый анализ мимики и сохранить его в БД.

Тем не менее, я отмечаю значительное отставание на стороне потребителя (3 секунды).В процессе устранения этой задержки снижается качество сообщений / изображений на стороне производителя.Однако это не улучшило задержку и еще больше ухудшило качество изображения на стороне потребителя.

Ниже приведен код на Kafka Producer:

from confluent_kafka import Producer
import cv2
import time, sys

p = Producer({'bootstrap.servers': 'localhost'})

encode_param=[int(cv2.IMWRITE_JPEG_QUALITY),90]

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

camera = cv2.VideoCapture(0)

try:
    while (True):
        success, frame = camera.read()
        frame = cv2.resize(frame, None, fx=0.3, fy=0.3)
        result, buffer = cv2.imencode('.jpg', frame, encode_param)
        #ret, buffer = cv2.imencode('.jpg', frame)


        p.poll(0)
            p.produce('newtopic', buffer.tobytes(), callback=delivery_report)

        print('sent')
        p.flush()
        time.sleep(0.1)
except:
    print("\nExiting.")
    sys.exit(1)

Ниже приведен код на Kafka Consumer:

import cv2
import sys
import os
import imutils
import logging

from confluent_kafka import Consumer, KafkaError
from confluent_kafka.admin import AdminClient


def example_delete_topics(a, topics):
        fs = a.delete_topics(topics, operation_timeout=30)

        # Wait for operation to finish.
        for topic, f in fs.items():
            try:
                f.result()  # The result itself is None
                print("Topic {} deleted".format(topic))
            except Exception as e:
                print("Failed to delete topic {}: {}".format(topic, e))
a = AdminClient({'bootstrap.servers': 'localhost'})
example_delete_topics(a, ['newtopic'])
c = Consumer({
    'bootstrap.servers': 'localhost',
    'group.id': 'mygroup',
    'auto.offset.reset': 'latest'
})
camID = 0
c.subscribe(['newtopic'])
(H, W) = (None, None)

while True:
    msg = c.poll(1.0)

    if msg is None:
        print ("No Message")
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print ("Received")
    img_bytes = msg.value()
    frame = np.array(Image.open(io.BytesIO(img_bytes)))
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame = imutils.resize(frame, width=800)

    if W is None or H is None:
        (H, W) = frame.shape[:2]
    #print (H,W)
    try:
        rects, landmarks = face_detect.detect_face(frame,20)
        rects_tracker = []

        for (i,rect) in enumerate(rects):

            # Intensive calculations on sentiment analysis

            draw_border(frame,(rect[0],rect[1]),(rect[0] + rect[2],rect[1]+rect[3]),(127,255,255),1,10,20)
            cv2.putText(frame,final_result,(rect[0],rect[1]),cv2.FONT_HERSHEY_SIMPLEX,0.5,(127,255,255),1,cv2.LINE_AA)

    except Exception as e :
        logging.info('Error on line {}'.format(sys.exc_info()[-1].tb_lineno), type(e).__name__, e)
        pass


    cv2.imshow('Sample', frame)
    key = cv2.waitKey(20)
    if key == 27:  # exit on ESC
        end = time.time()
        seconds = end - start
        fps  = frame_counter / seconds;
        print ("Estimated frames per second : {0}".format(fps))
                    break

c.close()

Несмотря на то, что код на стороне потребителя тяжел, много шагов.На прямой веб-камере без Kafka для запуска каждого кадра требуется 150 мсек.Но с Кафкой, это занимает около 3 секунд.

Может ли кто-нибудь помочь с тем, как я могу значительно уменьшить его, чтобы получить почти в реальном времени

...