Как можно пропустить некоторые строки при чтении из локального файла с помощью сеанса pyspark? - PullRequest
0 голосов
/ 24 января 2019

Я использую pyspark для чтения и обработки некоторых данных из локальных .plt файлов. Вот как выглядит файл:

Geolife trajectory
WGS 84
Altitude is in Feet
Reserved 3
0,2,255,My Track,0,0,2,8421376
0
39.984094,116.319236,0,492,39744.2451967593,2008-10-23,05:53:05
39.984198,116.319322,0,492,39744.2452083333,2008-10-23,05:53:06
39.984224,116.319402,0,492,39744.2452662037,2008-10-23,05:53:11
39.984211,116.319389,0,492,39744.2453240741,2008-10-23,05:53:16
......

Как показано выше, меня не интересуют первые 6 строк, я хочу строки, начинающиеся с 7-й строки. Поэтому я хочу использовать сеанс спарка, чтобы прочитать этот файл из 7-й строки. Вот код, который я пробовал, но не смог:

from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
df = session.read.\
     option('delimiter', ',').\
     option('header', 'false').\
     csv('test.plt')
df.show()

Может ли кто-нибудь дать мне совет? Спасибо за внимание.

Ответы [ 3 ]

0 голосов
/ 24 января 2019

В дополнение к замечательному методу, предложенному @ Arnon Rotem-Gal-Oz, мы также можем использовать некоторые специальные свойства любого столбца, если он есть.

В данных YQ. Wang's мы можем видетьстолбец 6th является датой, и вероятность того, что столбец 6th в header также будет date, весьма мала.Итак, идея состоит в том, чтобы проверить это специальное свойство для столбца 6th.to_date() преобразует string в date.Если этот столбец не date, то to_date() вернет Null, и мы отфильтруем все такие строки, используя предложение .where() -

from pyspark.sql.functions import to_date
from pyspark.sql.types import FloatType, StringType, StructType, StructField
df = spark.read.schema(schema)\
                    .format("csv")\
                    .option("header","false")\
                    .option("sep",',')\
                    .load('test.plt')\
                    .where(to_date(col('f'),'yyyy-MM-dd').isNotNull())
df.show()
+---------+----------+----+---+---------+----------+--------+
|        a|         b|   c|  d|        e|         f|       g|
+---------+----------+----+---+---------+----------+--------+
|39.984093| 116.31924|   0|492|39744.246|2008-10-23|05:53:05|
|  39.9842| 116.31932|   0|492|39744.246|2008-10-23|05:53:06|
|39.984222|116.319405|   0|492|39744.246|2008-10-23|05:53:11|
| 39.98421| 116.31939|   0|492|39744.246|2008-10-23|05:53:16|
+---------+----------+----+---+---------+----------+--------+

У этого метода также есть недостатки, напримересли отсутствует date, то весь ряд отфильтровывается.

0 голосов
/ 24 января 2019

Предполагая, что данные из 7-й строки и далее следуют шаблону, который вы показали:

from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
data = session.read.textFile('test.plt')

header = data.head(6)  # the first six rows

filtered = data.filter(row => row != header)
               .withColumn("a", split(col("value"), ",").getItem(0))
               .withColumn("b", split(col("value"), ",").getItem(1))
               .withColumn("c", split(col("value"), ",").getItem(2))
               .withColumn("d", split(col("value"), ",").getItem(3))
               .withColumn("e", split(col("value"), ",").getItem(4))
               .withColumn("f", split(col("value"), ",").getItem(5))
               .withColumn("g", split(col("value"), ",").getItem(6))
               .drop("value")
0 голосов
/ 24 января 2019
from pyspark.sql.types import *
from pyspark.sql import SparkSession
session = SparkSession.builder.appName('file reader').master('local[*]').getOrCreate()
schema = StructType([StructField("a", FloatType()),
                     StructField("b", FloatType()),
                     StructField("c", IntegerType()),
                     StructField("d", IntegerType()),
                     StructField("e", FloatType()),
                     StructField("f", StringType()),
                     StructField("g", StringType())])
df=session.read.option('mode','DROPMALFORMED').csv('test.plt',schema)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...