Синтаксическая ошибка в CQL-запросе при попытке записи на cassandra из python - PullRequest
0 голосов
/ 14 октября 2018

Итак, я создаю приложение на python, которое берет данные из твиттера, а затем сохраняет их на Кассандре.Мои текущие проблемы заключаются в скрипте, который читает данные из kafka и пытается записать их в cassandra следующим образом:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer




class Consumer(multiprocessing.Process):
   def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
         self.stop_event.set()

    def run(self):
       consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        consumer.subscribe(['twitter'])



    while not self.stop_event.is_set():
        for message in consumer:
            # session.execute(
            #     """
            #     INSERT INTO mensaje_73 (tweet)
            #     VALUES (message)
            #     """
            # )
            print(message)
            cluster = Cluster()
            session = cluster.connect('twitter')
            session.execute(
                    """
                    INSERT INTO mensaje_73 (tweet)
                    VALUES (message)
                    """
                )

            # if self.stop_event.is_set():
            #     break

    consumer.close()


   def main():

    tasks = [
        Consumer()
    ]

    for t in tasks:
        t.start()

    time.sleep(10)

    for task in tasks:
        task.stop()



if __name__ == "__main__":
     logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:% 
   (levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

Я попытался вставить тестовые сообщения в таблицу twitter.mensaje_73, и она сработала отлично,как здесь:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer


cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
    """
    INSERT INTO mensaje_73 (tweet)
    VALUES ('helooo')
    """
)

Любая помощь будет высоко ценится:)

1 Ответ

0 голосов
/ 15 октября 2018

Итак, проблема здесь в том, что ваша переменная message рассматривается как литерал в CQL, который не будет работать без одинарных кавычек.Следовательно, ошибка.

Чтобы исправить это, я бы пошел по пути использования подготовленного оператора, а затем связал message с ним:

session = cluster.connect('twitter')
preparedTweetInsert = session.prepare(
        """
        INSERT INTO mensaje_73 (tweet)
        VALUES (?)
        """
    )
session.execute(preparedTweetInsert,[message])

Дайте этому попытку, ипосмотрим, поможет ли это.

Кроме того, это похоже на простую модель данных.Но вы должны спросить себя: как вы собираетесь запрашивать эти данные?Это не сработает, если только tweet не будет вашим единственным ПЕРВИЧНЫМ КЛЮЧОМ.Это также означает, что единственный способ, которым вы можете запросить отдельный твит, - это точный текст сообщения.Есть над чем подумать, но лучше разбить его по дням, так как он будет хорошо распределяться и обеспечит гораздо лучшую модель запросов.

...