Присоединение DStream и RDD с контрольными точками - PullRequest
0 голосов
/ 03 сентября 2018

Я боролся за объединение DStream и RDD. Чтобы установить сцену:

  • Искра - 2.3.1
  • Python - 3.6.3

РДД

Я читаю в RDD из файла CSV, разделяю записи и создаю пару RDD.

sku_prices = sc.textFile("sku-catalog.csv")\
    .map(lambda line: line.split(","))\
    .map(lambda fields: (fields[0], float(fields[1])))

Это вывод из sku_prices.collect():

[('0003003001', 19.25),
 ('0001017002', 2.25),
 ('0001017003', 3.5),
 ('0003013001', 18.75),
 ('0004017002', 16.5),
 ('0002008001', 2.25),
 ('0004002001', 10.75),
 ('0005020002', 10.5),
 ('0001004002', 3.5),
 ('0002016003', 14.25)]

DStream

Я читаю DStream от Кафки.

orders = kstream.map(lambda n: n[1]).map(lambda n: json.loads(n))

items = orders.map(lambda order: order['items'])\
              .flatMap(lambda items: [(i['sku'], i['count']) for i in items])\
              .reduceByKey(lambda x, y: x + y)

Когда я запускаю pprint() на orders, я получаю вывод, который выглядит следующим образом:

-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', 3)
('0002016003', 1)
('0003013001', 1)

Регистрация

Теперь я хочу присоединить items DStream к sku_prices RDD. Я знаю, что не могу сделать это соединение напрямую, но мое чтение предполагает, что я могу использовать метод transform() в DStream для выполнения этой работы. Вот что у меня есть:

items.transform(lambda rdd: rdd.join(sku_prices)).pprint()

Я ожидаю получить DStream, который выглядит примерно так:

-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', (3, 10.75))
('0002016003', (1, 14.25))
('0003013001', (1, 18.75))

Документация Spark предполагает, что это должно работать, и оно работает: именно этот результат я и получаю! :)

Checkpointing

Однако я также хочу выполнить операцию с состоянием, поэтому мне нужно ввести контрольные точки.

ssc.checkpoint("checkpoint")

Простое добавление контрольных точек приводит к этой ошибке на transform():

Похоже, что вы пытаетесь передать СДР или сослаться на СДР от действия или трансформации. СДР трансформации и действия может вызываться только водителем, а не внутри других преобразования; например, rdd1.map (лямбда x: rdd2.values.count () * х) является недопустимым, поскольку преобразование значений и подсчет действий не может быть выполнено внутри преобразования rdd1.map.

Ответ по этой теме предполагает, что контрольные точки и внешние RDD не смешиваются. Это можно обойти? Можно ли присоединить DStream и RDD, когда в StreamingContext включена контрольная точка?

Спасибо, Эндрю.

...