Почему функция dbproc не вызывается в цикле for, потребляющем сообщения Kafka? - PullRequest
0 голосов
/ 15 сентября 2018

У меня есть скрипт, который потребляет сообщения kafka, а затем вызывает функцию kyotocabinet для обработки в соответствии с полученными сообщениями.

import json
import os
import sys
from datetime import datetime
from os.path import isfile, join

from elasticsearch import Elasticsearch
from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError
from pymongo import MongoClient

from kyotocabinet import *

client = MongoClient("localhost", 27017)
db = client["packages"]

es = Elasticsearch()
consumer = KafkaConsumer("versioned", group_id='es-embed', bootstrap_servers=['localhost:9092'])

def resolve_last_kch_version(package_path):
    message = {}
    onlyfiles = [f for f in os.listdir(package_path) if isfile(join(package_path, f))]
    kch_number = []
    for kch in onlyfiles:
        if "clone" in kch:
            continue
        if kch.endswith(".kch"):
            kch_number.append(int(kch.split(".kch")[0].split("_")[-1]))
    if len(kch_number) == 0:
        return False
    else:
        kch_number.sort()
        return (package_path + "/" + "version_" + str(kch_number[-1]) + ".kch")

db_kyoto = DB()

def dbproc(db_kyoto):
    for key in db_kyoto:
    print ("Inside dbproc")
        print (key)
        print (db_kyoto[key])

for message in consumer:
    message = eval(message.value)
    package = message["package_name"]
    package_path = db.packages.find_one({"package": package})["path"]
    # print (package_path)
    print (resolve_last_kch_version(package_path))
    DB.process(dbproc, resolve_last_kch_version(package_path))
    print ("Done.")

Код вызывает DB.process с использованием функтора, который определен чуть выше. Я поместил сообщение в функцию, и оно никогда не печатается. Это как если бы функция никогда не вызывалась. Заявление сразу после этого напечатано.

Я не могу понять, как вызвать этот функтор?

...