Как использовать траверсу только часть СДР в день? - PullRequest
0 голосов
/ 20 июня 2019

Этот вопрос касается дизайна кода.Как выполнить итерацию частей RDD сегодня и других частей на следующий день.

Я уже создал RDD или искровой кадр данных из 20 000 000 строк.Я хочу вызвать API с lbs.amap.com, но его можно посещать только 300 000 раз в день.

def gd_address(line):
    # GET rest api, return a list of values
    ...
# use these values to add columns to my RDD
df.rdd.map(lambda line: (line[0], line[1], gd_address(line)[0], gd_address(line)[1], gd_address(line)[2]), True)

Как мне написать программу, когда она просто пересекает 300 000 строк, затем останавливается, а затемПройдите следующие 300 000 строк на следующий день и остановитесь, если он закончил обход всего СДР?Любая идея будет оценена.

1 Ответ

0 голосов
/ 22 июня 2019

Как уже обсуждалось с @Glennie, ключевым моментом здесь является использование уникального добавочного идентификатора строки.Это означает, что новые данные будут иметь возрастающий инкрементный идентификатор, и этот идентификатор должен оставаться неизменным для старых данных.Другими словами, нам нужно убедиться, что одна конкретная запись будет иметь одинаковый соответствующий идентификатор при каждом выполнении задания.Для создания такого уникального идентификатора вы можете использовать zipWithIndex, который предоставляется через API RDD.В отличие от monotonically_increasing_id функция zipWithIndex обеспечивает последовательные значения идентификатора строки.Это будет играть важную роль для производительности вашей программы, так как, как мы увидим ниже, это может эффективно уменьшить количество строк, которые вам нужно обработать.Вот реализация подхода zipWithIndex:

df.rdd.zipWithIndex() \
           .map(lambda (line, row_id): (row_id, line[0], line[1], gd_address(line)[0], gd_address(line)[1], gd_address(line)[2])) \
           .toDF(["row_id", "c1", "c2", "c3", "c4", "c5"])

Вторым условием наряду с существованием уникального идентификатора строки является сортировка данных на основе только что созданного идентификатора строки.Тогда для первого дня желаемый диапазон будет 0 - 299 000, для второго дня - 300 000 - 599,999, для третьего - 600 000 - 899,999 и так далее.После извлечения и обработки каждого чанка вам нужно сохранить последний обработанный идентификатор строки.Вы можете сохранить последний идентификатор в файловой системе или в HDFS.Один из способов записи в HDFS - через
df.select("max(row_id)").write.text("hdfs://cluster/user/hdfs/last_row_id.txt") и чтение с помощью spark.read.text("hdfs://cluster/user/hdfs/last_row_id.txt").

. Вот полный код:

def callAmapAPI(data):
   for row in data:
     # make HTTP call here

# assign row id to df 
df = df.rdd.zipWithIndex() \
           .map(lambda (line, row_id): (row_id, line[0], line[1], gd_address(line)[0], gd_address(line)[1], gd_address(line)[2])) \
           .toDF(["row_id", "c1", "c2", "c3", "c4", "c5"]) 

# retrieve saved row id
lower_bound_rowid = spark.read.text("hdfs://cluster/user/hdfs/last_row_id.txt").first()[0]

chunk_size = 300000
upper_bound_rowid = lower_bound_id + chunk_size
partition_num = 8

# we restrict the number of rows to 300000 based on upper and lower bound
filtered_df = df.where(df["row_id"] > lower_bound_rowid &  df["row_id"] <= upper_bound_rowid) \
# optional allows more control to simultaneous calls to amap API i.e 8 concurrent HTTP calls
.repartition(partition_num, "row_id") \
.orderBy("row_id")

# call API for each partition
filtered_df.foreachPartition(callAmapAPI)

# save max row id for next day
filtered_df.select("max(row_id)") \
.write.mode('overwrite') \
.text("hdfs://cluster/user/hdfs/last_row_id.txt")

Второй подход с использованием monotonically_increasing_id (не рекомендуется)

Что касается подхода, использующего monotonically_increasing_id Я считаю, что это может сработать, только если ваш набор данных останется прежним (нет новых строк), иначе нет способа гарантировать, что генерируемый_ид будетостаются одинаковыми для каждой строки, поэтому вы не сможете отслеживать последнюю обработанную запись (Spark может выдавать разные идентификаторы для одной и той же записи).Хотя, если это так, и df не меняется, вы можете вызвать monotonically_increasing_id() только один раз и сохранить df с добавленным новым идентификатором.В этом случае вам понадобятся следующие два изменения.Сначала измените определение df на:

df = df.withColumn("row_id", monotonically_increasing_id())
df.write.csv(...) # or some other storage

Приведенный выше фрагмент должен выполняться только один раз, в отличие от предыдущего подхода, который вычисляет и назначает идентификатор строки при каждом выполнении задания.

Затем измените определение filtered_df на:

df = spark.read.csv(...) # retrieve the dataset with monotonically_increasing_id

filtered_df = df.where(df["row_id"] > lower_bound_rowid) \
.orderBy("row_id") \
.limit(chunk_size) \
.repartition(partition_num, "row_id")

Здесь следует обратить внимание на две вещи.Во-первых, мы не знаем, что upper_bound_rowid (monotonically_increasing_id будет генерировать произвольные идентификаторы для каждого раздела), и поэтому upper_bound_rowid не используется в предложении where.Во-вторых, orderBy должно предшествовать limit, иначе мы не сможем обеспечить строки topN.Этот подход также может иметь более низкую производительность, поскольку orderBy выполняется для большего набора данных.

...