Перенос остатков с «присоединиться» на работу на следующий день в Spark - PullRequest
2 голосов
/ 19 февраля 2020

У меня есть работа, которая выполняется ежедневно. Цель этой работы - сопоставить HTTP-запросы с соответствующими HTTP-ответами. Этого можно достичь, потому что все HTTP-запросы и HTTP-ответы имеют GUID, который однозначно их связывает.

Таким образом, задание имеет дело с двумя фреймами данных: один содержит запросы, а другой - ответы. Таким образом, чтобы сопоставить запросы с их ответами, я, очевидно, выполняю внутреннее соединение на основе этого GUID.

Проблема, с которой я сталкиваюсь, заключается в том, что запрос был получен в день X в 23:59:59. его ответ может быть получен в день X + 1 в 00:00:01 (или наоборот), что означает, что они никогда не будут коррелироваться вместе, ни в день X, ни в день X + 1.

Здесь это пример кода, который иллюстрирует то, что я имею в виду:

val day1_requests = """[ { "id1": "guid_a", "val" : 1 }, { "id1": "guid_b", "val" : 3 }, { "id1": "guid_c", "val" : 5 }, { "id1": "guid_d", "val" : 7 } ]"""
val day1_replies  = """[ { "id2": "guid_a", "val" : 2 }, { "id2": "guid_b", "val" : 4 }, { "id2": "guid_c", "val" : 6 },                                { "id2": "guid_e", "val" : 10 } ]"""

val day2_requests = """[                                 { "id1": "guid_e", "val" :  9 }, { "id1": "guid_f", "val" : 11 }, { "id1": "guid_g", "val" : 13 }, { "id1": "guid_h", "val" : 15 } ]"""
val day2_replies  = """[ { "id2": "guid_d", "val" : 8 },                                  { "id2": "guid_f", "val" : 12 }, { "id2": "guid_g", "val" : 14 }, { "id2": "guid_h", "val" : 16 } ]"""

val day1_df_requests = spark.read.json(spark.sparkContext.makeRDD(day1_requests :: Nil))
val day1_df_replies  = spark.read.json(spark.sparkContext.makeRDD(day1_replies  :: Nil))

val day2_df_requests = spark.read.json(spark.sparkContext.makeRDD(day2_requests :: Nil))
val day2_df_replies  = spark.read.json(spark.sparkContext.makeRDD(day2_replies  :: Nil))

day1_df_requests.show()
day1_df_replies.show()
day2_df_requests.show()
day2_df_replies.show()

day1_df_requests.join(day1_df_replies, day1_df_requests("id1") === day1_df_replies("id2")).show()
// guid_d from request stream is left over, as well as guid_e from reply stream.
//
// The following 'join' is done on the following day.
// I would like to carry 'guid_d' into day2_df_requests and 'guid_e' into day2_df_replies).
day2_df_requests.join(day2_df_replies, day2_df_requests("id1") === day2_df_replies("id2")).show()

Я вижу 2 решения.

Решение № 1 - пользовательский перенос

In В этом решении в день X я выполнял бы соединение full_outer вместо внутреннего соединения, и я сохранял бы в некотором хранилище результаты, пропущенные с одной или другой стороны. На следующий день X + 1, я буду загружать эти дополнительные данные вместе с моими "обычными данными" при выполнении моего соединения.

Дополнительная деталь реализации заключается в том, что мой пользовательский перенос должен будет отбрасываться " старые переносы "в противном случае он может накапливаться, т. е. возможно, что HTTP-запрос или HTTP-ответ за 10 дней go никогда не увидит своего аналога (возможно, приложение упало, например, таким образом, был выдан HTTP-запрос, но не ответ) .

Решение № 2 - сворачивание guid

В этом решении я бы сделал предположение, что мои запросы и ответы находятся в пределах определенного промежутка времени друг друга (например, 5 минут). Таким образом, в день X + 1 я бы также загружал последние 5 минут данных из дня X и включал их как часть моего соединения. Таким образом, мне не нужно использовать дополнительное хранилище, как в решении № 1. Однако недостатком является то, что это решение требует, чтобы целевое хранилище могло обрабатывать дублирующиеся записи (например, если целевое хранилище представляет собой таблицу SQL, PK должен быть этим GUID и выполнять вставку вместо вставки). .


Вопрос

Так что мой вопрос заключается в том, предоставляет ли Spark функциональность для автоматического решения подобной ситуации, не требуя при этом ни одного из двух моих решений и тот же факт делает вещи проще и элегантнее?


Бонусный вопрос

Давайте предположим, что мне нужно сделать тот же тип корреляции, но с потоком данные (т. е. вместо ежедневного пакетного задания, которое выполняется на наборе данных stati c, я использую потоковую передачу Spark, а данные обрабатываются в потоковых потоках запросов и ответов).

В этом сценарии " full_outer "join явно неуместен (https://dzone.com/articles/spark-structured-streaming-joins) и фактически не нужен, так как Spark Streaming позаботится об этом для нас, имея скользящий windo w для выполнения объединения.

Однако мне любопытно узнать, что произойдет, если задание будет остановлено (или произойдет сбой), а затем возобновлено. Аналогично приведенному выше примеру пакетного режима, что если задание было прервано после того, как запрос был получен (и подтвержден) из потока / очереди, но за до его соответствующий ответ сделал? Сохраняет ли Spark Streaming состояние своего скользящего окна, следовательно, возобновление работы сможет коррелировать, как если бы поток никогда не прерывался?

PS резервное копирование вашего ответа с гиперссылками на авторитетные документы (например, Apache) собственный) будет высоко ценится.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...