Цикл Spark For для больших таблиц с использованием внутреннего соединения - PullRequest
0 голосов
/ 17 мая 2019

Создайте новую таблицу из двух существующих таблиц A и B, A имеет данные истории за 1 год, а B имеет данные идентификаторов.Мне нужно объединить эти две таблицы с помощью Spark, где производительность хорошая, а также зациклить данные для каждого дня или месяца, поскольку business_day - это раздел.Я не могу рассматривать целые таблицы, поскольку каждый рабочий день содержит 30 миллионов.

Таблица A - содержит n столбцов, таких как ID, Business_Day, Имя

Таблица B - содержит n столбцов -ID, ID_Code

Таблица A должна присоединиться к таблице B с помощью ID=ID и получить ID_Code вместе с другими столбцами A

insert into output_table
select ID, ID_CODE,Business_Day, Name 
from A,B where 
A.ID=B.ID

Я не уверен, как написать цикл For для вышеупомянутого,Скрипт вставки работает, но для одного дня это занимает 2 часа, и мне нужно вручную изменить рабочий день на год, что невозможно, но цикл и другие шаги по производительности помогут ему работать намного быстрее.

1 Ответ

1 голос
/ 17 мая 2019

Запрос Spark SQL с Python

Источник

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext(conf=SparkConf())
sqlContext = SQLContext(sc)

# Table A read and spark create dataframe --> df_A
# df_A = sqlContext.createDataFrame(...)
# Table B read and spark create dataframe --> df_B
# df_B = sqlContext.createDataFrame(...)


# Example:

df1 = sqlContext.createDataFrame(
    pd.DataFrame.from_records(
        [
            [1,12,'Test'],
            [2,22,'RD']
        ],
        columns=['ID','ID_CODE','Departman']
    ))

df2 = sqlContext.createDataFrame(
    pd.DataFrame.from_records(
        [
            [1,'friday','Shan'],
            [2,'friday','ramazan'],
            [3,'friday','bozkir']
        ],
    columns=['ID','Business_Day','Name']))

### pyspark method SQL 
df = df_A.join(df_B,df_B.ID == df_A.ID)
.select('ID_CODE','Business_Day','Name')

### Spark SQL method
df1.registerTempTable('df_A')
df2.registerTempTable('df_B')

df = sqlContext.sql("""
            SELECT ID_CODE,Business_Day,Name
            FROM (
                SELECT *
                FROM df_A A LEFT JOIN df_B B ON B.ID = A.ID
            ) df    
            """)

""").show()

[In]: df.show()
[Out]: 
+-------+------------+-------+
|ID_CODE|Business_Day|   Name|
+-------+------------+-------+
|     12|      friday|   Shan|
|     22|      friday|ramazan|
+-------+------------+-------+

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...