Как проверить данные конкретного раздела из разделов Spark в Pyspark - PullRequest
0 голосов
/ 04 октября 2018

Я создал два фрейма данных в pyspark из моей таблицы улья:

data1 = spark.sql("""
   SELECT ID, MODEL_NUMBER, MODEL_YEAR ,COUNTRY_CODE
   from MODEL_TABLE1 where COUNTRY_CODE in ('IND','CHN','USA','RUS','AUS')
""");

каждая страна имеет миллионы уникальных идентификаторов в буквенно-цифровом формате.

data2 = spark.sql("""
   SELECT ID,MODEL_NUMBER, MODEL_YEAR, COUNTRY_CODE
   from MODEL_TABLE2 where COUNTRY_CODE in ('IND','CHN')
""");

Я хочуобъедините оба этих фрейма данных, используя pyspark в столбце ID.

Как мы можем перераспределить наши данные, чтобы они равномерно распределялись по разделам.

Можно ли использовать ниже для восстановления моих данных?

newdf1 = data2.repartition(100, "ID")
newdf2 = data2.repartition(100, "ID")

Каков наилучший способ разбиения, чтобы объединение работало быстрее?

1 Ответ

0 голосов
/ 04 октября 2018

Насколько я знаю, ваш подход repartition, предоставляющий столбец идентификаторов, верен.Рассмотрим следующее как подтверждение концепции, используя spark_partition_id() для получения соответствующего идентификатора раздела:

Создание фиктивных данных

import pandas as pd
import numpy as np
from pyspark.sql.functions import spark_partition_id

def create_dummy_data():

    data = np.vstack([np.random.randint(0, 5, size=10), 
                      np.random.random(10)])

    df = pd.DataFrame(data.T, columns=["id", "values"])

    return spark.createDataFrame(df)

def show_partition_id(df):
    """Helper function to show partition."""
    return df.select(*df.columns, spark_partition_id().alias("pid")).show()

df1 = create_dummy_data()
df2 = create_dummy_data()

Показ идентификатора раздела перед повторным использованием

show_partition_id(df1)

+---+-------------------+---+
| id|             values|pid|
+---+-------------------+---+
|1.0| 0.6051170383675885|  0|
|3.0| 0.4613520717857513|  0|
|0.0|  0.797734780966592|  1|
|2.0|0.35594664760134587|  1|
|2.0|0.08223203758144915|  2|
|0.0| 0.3112880092048709|  2|
|4.0| 0.2689639324292137|  3|
|1.0| 0.6466782159542134|  3|
|0.0| 0.8340472796153436|  3|
|4.0| 0.8054752411745659|  3|
+---+-------------------+---+

show_partition_id(df2)

+---+-------------------+---+
| id|             values|pid|
+---+-------------------+---+
|4.0| 0.8950517294190533|  0|
|3.0| 0.4084717827425539|  0|
|3.0|  0.798146627431009|  1|
|4.0| 0.8039931522181247|  1|
|3.0|  0.732125135531736|  2|
|0.0|  0.536328329270619|  2|
|1.0|0.25952064363007576|  3|
|2.0| 0.1958334111199559|  3|
|0.0|  0.728098753644471|  3|
|0.0| 0.9825387111807906|  3|
+---+-------------------+---+

Показать идентификатор раздела после перераспределения

show_partition_id(df1.repartition(2, "id"))

+---+-------------------+---+
| id|             values|pid|
+---+-------------------+---+
|1.0| 0.6051170383675885|  0|
|3.0| 0.4613520717857513|  0|
|4.0| 0.2689639324292137|  0|
|1.0| 0.6466782159542134|  0|
|4.0| 0.8054752411745659|  0|
|0.0|  0.797734780966592|  1|
|2.0|0.35594664760134587|  1|
|2.0|0.08223203758144915|  1|
|0.0| 0.3112880092048709|  1|
|0.0| 0.8340472796153436|  1|
+---+-------------------+---+

show_partition_id(df2.repartition(2, "id"))

+---+-------------------+---+
| id|             values|pid|
+---+-------------------+---+
|4.0| 0.8950517294190533|  0|
|3.0| 0.4084717827425539|  0|
|3.0|  0.798146627431009|  0|
|4.0| 0.8039931522181247|  0|
|3.0|  0.732125135531736|  0|
|1.0|0.25952064363007576|  0|
|0.0|  0.536328329270619|  1|
|2.0| 0.1958334111199559|  1|
|0.0|  0.728098753644471|  1|
|0.0| 0.9825387111807906|  1|
+---+-------------------+---+

После перераспределения идентификаторы 0 и 2 находятся в одном и том же разделе, а остальные - в другом разделе.

...