тест на значение позиции потока Кафки - PullRequest
0 голосов
/ 11 апреля 2019

Я хочу проверить значение позиции потока Кафки, если равное значение имеет, например, "2", затем отобразить функцию запуска A, иначе функцию запуска B

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'video-group',
    'fetch.message.max.bytes': '15728640',
    'auto.offset.reset': 'largest'})
# Group ID is completely arbitrary

lines = kafkaStream.map(lambda x: x[1])
 flag = lines.map(lambda line: line.split(",")).map(lambda v : v[0])

if  flag == "2":
    A = lines.map(lambda line: line.split(",")).map(lambda v: v[1])
    A.pprint()
else:
    lines.pprint()

1 Ответ

0 голосов
/ 13 апреля 2019

flag == "2" никогда не будет истинным, потому что это объект Spark RDD, а не единичная строка.

Плюс, у Kafka есть непрерывный поток записей, возможно, поэтому просто проверка второго столбца этой первой записи (при условии, что вы вызвали функцию collect ()) тоже не сработает.

Если вы хотите проверить 2 любой строки, вам нужно отфильтровать ее

lines = kafkaStream.map(lambda x: x[1])
flag = lines.map(lambda line: line.split(",")).filter(lambda columns: columns[1] == "2")
flag.pprint()

Если вы хотите просто использовать Kafka с помощью Python и проверять значения записей, вам не нужен Spark

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...