Я боролся за объединение DStream и RDD. Чтобы установить сцену:
- Искра - 2.3.1
- Python - 3.6.3
РДД
Я читаю в RDD из файла CSV, разделяю записи и создаю пару RDD.
sku_prices = sc.textFile("sku-catalog.csv")\
.map(lambda line: line.split(","))\
.map(lambda fields: (fields[0], float(fields[1])))
Это вывод из sku_prices.collect()
:
[('0003003001', 19.25),
('0001017002', 2.25),
('0001017003', 3.5),
('0003013001', 18.75),
('0004017002', 16.5),
('0002008001', 2.25),
('0004002001', 10.75),
('0005020002', 10.5),
('0001004002', 3.5),
('0002016003', 14.25)]
DStream
Я читаю DStream от Кафки.
orders = kstream.map(lambda n: n[1]).map(lambda n: json.loads(n))
items = orders.map(lambda order: order['items'])\
.flatMap(lambda items: [(i['sku'], i['count']) for i in items])\
.reduceByKey(lambda x, y: x + y)
Когда я запускаю pprint()
на orders
, я получаю вывод, который выглядит следующим образом:
-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', 3)
('0002016003', 1)
('0003013001', 1)
Регистрация
Теперь я хочу присоединить items
DStream к sku_prices
RDD. Я знаю, что не могу сделать это соединение напрямую, но мое чтение предполагает, что я могу использовать метод transform()
в DStream для выполнения этой работы. Вот что у меня есть:
items.transform(lambda rdd: rdd.join(sku_prices)).pprint()
Я ожидаю получить DStream, который выглядит примерно так:
-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', (3, 10.75))
('0002016003', (1, 14.25))
('0003013001', (1, 18.75))
Документация Spark предполагает, что это должно работать, и оно работает: именно этот результат я и получаю! :)
Checkpointing
Однако я также хочу выполнить операцию с состоянием, поэтому мне нужно ввести контрольные точки.
ssc.checkpoint("checkpoint")
Простое добавление контрольных точек приводит к этой ошибке на transform()
:
Похоже, что вы пытаетесь передать СДР или сослаться на
СДР от действия или трансформации. СДР трансформации и действия
может вызываться только водителем, а не внутри других
преобразования; например, rdd1.map (лямбда x: rdd2.values.count () *
х) является недопустимым, поскольку преобразование значений и подсчет действий
не может быть выполнено внутри преобразования rdd1.map.
Ответ по этой теме предполагает, что контрольные точки и внешние RDD не смешиваются. Это можно обойти? Можно ли присоединить DStream и RDD, когда в StreamingContext включена контрольная точка?
Спасибо,
Эндрю.