Использование функции разделения в PySpark - PullRequest
0 голосов
/ 04 мая 2018

Я пытаюсь найти определенную строку из очень большого файла журнала. Я могу искать строку.

Теперь, используя это пространство строк, я хочу создать фрейм данных, но я не могу этого сделать. Я пробовал приведенный ниже код, но не смог достичь.

from pyspark import SparkConf,SparkContext
from pyspark import  SQLContext
from pyspark.sql.types import *
from pyspark.sql import *

conf=SparkConf().setMaster("local").setAppName("invparsing")
sc=SparkContext(conf=conf)
sql=SQLContext(sc)
def f(x) :print(x)

data_frame_schema=StructType([
    StructField("Typeof",StringType()),
    #StructField("Produt_mod",StringType()),
    #StructField("Col2",StringType()),
    #StructField("Col3",StringType()),
    #StructField("Col4",StringType()),
    #StructField("Col5",StringType()),
])
path="C:/rk/IBMS/inv.log"

lines=sc.textFile(path)
NodeStr=lines.filter(lambda x:'Node :RBS6301' in x).map(lambda x:x.split(" +"))
NodeStr.foreach(f)
Nodedf=sql.createDataFrame(NodeStr,data_frame_schema)
Nodedf.show(truncate=False)

Теперь я получаю вывод - только одна строка. O хочу разделить значение на основе пробела.

[u'Node: RBS6301         XP10521/26 R30F L17A.4-6 (C17.0_LSV_PS4)']
+-------------------------------------------------------------+
|Typesof                                                      |  
+-------------------------------------------------------------+ 
|Node: RBS6301         XP10521/26   R30F   L17A.4-6   (C17.0_LSV_PS4)
+-------------------------------------------------------------+

Ожидаемый результат:

Typeof      Produt_mod  Col2          Col3    Col4        COL5 
Node     RBS6301       XP10521/26    R30F    L17A.4-6    C17.0_LSV_PS4

1 Ответ

0 голосов
/ 04 мая 2018

Первая ошибка, которую вы сделали здесь:

lambda x:x.split(" +")

str.split принимает постоянную строку, а не регулярное выражение. Чтобы разделить пробел, вам нужно просто опустить разделитель

lines = sc.parallelize(["Node: RBS6301         XP10521/26 R30F L17A.4-6 (C17.0_LSV_PS4)"])

lines.map(lambda s: s.split()).first()
# ['Node:', 'RBS6301', 'XP10521/26', 'R30F', 'L17A.4-6', '(C17.0_LSV_PS4)']

Как только вы это сделаете, вы можете просто отфильтровать и преобразовать в DataFrame:

df = lines.map(lambda s: s.split()).filter(lambda x: len(x) == 6).toDF(
    ["col1", "col2", "col3", "col4", "col5", "col6"]
)
df.show()
# +-----+-------+----------+----+--------+---------------+
# | col1|   col2|      col3|col4|    col5|           col6|
# +-----+-------+----------+----+--------+---------------+
# |Node:|RBS6301|XP10521/26|R30F|L17A.4-6|(C17.0_LSV_PS4)|
# +-----+-------+----------+----+--------+---------------+

и filter:

df[df["col2"] == "RBS6301"].show()
# +-----+-------+----------+----+--------+---------------+
# | col1|   col2|      col3|col4|    col5|           col6|
# +-----+-------+----------+----+--------+---------------+
# |Node:|RBS6301|XP10521/26|R30F|L17A.4-6|(C17.0_LSV_PS4)|
# +-----+-------+----------+----+--------+---------------+
...