Попытка использовать Очередь для вставки значений в Cassandra Python - PullRequest
0 голосов
/ 10 июня 2019

Я использую кассандру для хранения данных, которые я получаю из клиентской программы.У меня есть серверная программа, где я запускаю потоки для вставки значений, но без очереди она не работает должным образом, но я пытался использовать очередь, не зная, где я иду не так.Может кто-нибудь сказать мне, как получить доступ к инструкции вставки несколько раз.

клиентская программа:

import socket
import threading
import client_to_orc
import json
import pyangbind.lib.pybindJSON as pybindJSON

def con_server(server_host, server_port, thread_num):
    server_address = (server_host, server_port)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    #print(sock.getsockname())
    ---
    # sw is the packets code comes here

    ---
    packet=(pybindJSON.dumps(sw))     ----> Data in string format

    sock.sendto(packet.encode("Utf-8"), server_address)
    data = sock.recv(4096)    
    data = data.decode()
    print('Client:' + data)


if __name__ == "__main__":
    SERVER_ADDR = ("localhost", 4242)
    threads = []
    for thread_num in range(10):
        thread_args = (SERVER_ADDR[0], SERVER_ADDR[1], thread_num)
        t = threading.Thread(target=con_server, name='con_servr', args=thread_args)
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

Отредактированный сервер:

def insert_Vmac(msg):
    cluster = Cluster(contact_points=['172.17.0.2'])
    session = cluster.connect()
    id=1
    session.execute("""INSERT INTO mykeyspace.hello(PacketID, PacketValue)
    VALUES(%s,%s)""",
    (id,msg))

def talkToClient(ip):
    logging.info("sending 'clients we received your data' to %s",ip )

    sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
    sock.sendto("ok".encode('utf-8'), ip)

def listen_clients(Host,port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((Host,port))
    client_list=[]
    while True:
        msgg, client = sock.recvfrom(1024)
        message = msgg.decode("utf-8")
        msg = json.dumps(message)

        print('connected with : ' + client[0]+ ':' + str(client[1]))
        for line in msg:
            cluster = Cluster(contact_points=['172.17.0.2'])
            session = cluster.connect()
            id = 0
            query = SimpleStatement("""INSERT INTO 
                  mykeyspace.hel(PacketID, PacketValue)
                  VALUES(%s,%s)""" %(id, msg))
             session.execute_async(query)
         print('hello inserted')
         t = threading.Thread(target=talkToClient, args=(client,))
         t.start()

if __name__=="__main__":
    t2 = ThreadWithReturnValue(target=listen_clients, args=("localhost", 4242,))
    t2.start()

Отредактированный код сохраняет данные только один раз.Но я отправляю данные 10 раз, но они не хранят все данные.

1 Ответ

1 голос
/ 10 июня 2019

Вы используете один и тот же идентификатор для всех запросов, поэтому каждая запись будет перезаписывать предыдущую запись.Вот почему вы извлекаете только одну строку, когда читаете данные.

Это при условии, что «id» в вашем случае - это ПЕРВИЧНЫЙ КЛЮЧ в вашей таблице.

...