У меня есть скрипт, который потребляет сообщения kafka, а затем вызывает функцию kyotocabinet для обработки в соответствии с полученными сообщениями.
import json
import os
import sys
from datetime import datetime
from os.path import isfile, join
from elasticsearch import Elasticsearch
from kafka import KafkaConsumer, KafkaProducer
from kafka.errors import KafkaError
from pymongo import MongoClient
from kyotocabinet import *
client = MongoClient("localhost", 27017)
db = client["packages"]
es = Elasticsearch()
consumer = KafkaConsumer("versioned", group_id='es-embed', bootstrap_servers=['localhost:9092'])
def resolve_last_kch_version(package_path):
message = {}
onlyfiles = [f for f in os.listdir(package_path) if isfile(join(package_path, f))]
kch_number = []
for kch in onlyfiles:
if "clone" in kch:
continue
if kch.endswith(".kch"):
kch_number.append(int(kch.split(".kch")[0].split("_")[-1]))
if len(kch_number) == 0:
return False
else:
kch_number.sort()
return (package_path + "/" + "version_" + str(kch_number[-1]) + ".kch")
db_kyoto = DB()
def dbproc(db_kyoto):
for key in db_kyoto:
print ("Inside dbproc")
print (key)
print (db_kyoto[key])
for message in consumer:
message = eval(message.value)
package = message["package_name"]
package_path = db.packages.find_one({"package": package})["path"]
# print (package_path)
print (resolve_last_kch_version(package_path))
DB.process(dbproc, resolve_last_kch_version(package_path))
print ("Done.")
Код вызывает DB.process с использованием функтора, который определен чуть выше. Я поместил сообщение в функцию, и оно никогда не печатается. Это как если бы функция никогда не вызывалась. Заявление сразу после этого напечатано.
Я не могу понять, как вызвать этот функтор?