Загрузка geoJSON в pyspark с проверкой схемы - PullRequest
0 голосов
/ 12 июня 2018

Я пытаюсь создать схему для проверки GeoJSON загружаемых файлов:

validSchema = StructType([
StructField("type", StringType()),
StructField("geometry", StructType([
  StructField("coordinates", ArrayType(DoubleType())), # POINT
  StructField("coordinates", ArrayType(ArrayType(ArrayType(DoubleType())))),  # POLYGON
  StructField("coordinates", ArrayType(ArrayType(DoubleType()))), # LINESTRING
  StructField("type", StringType(), False)
]), False),
StructField("properties", MapType(StringType(), StringType()))
])

df = spark.read.option("multiline","true").json(src_data,mode="PERMISSIVE",schema=validSchema)

Проблема в том, что у меня есть три вида "координат", чтобы удовлетворить действительный GeoJSONтипы.Тем не менее, работает только последнее правило, я предполагаю, что оно имеет приоритет над двумя предыдущими в зависимости от порядка.

В любом случае можно ли указать схему как указание на то, что одна из координатных схем должна совпадать?

Прямо сейчас единственный способ обойти это - создать три схемы и три импорта, что означает сканирование всех данных три раза (у меня есть 5 ТБ данных, поэтому это кажется сумасшедшим).

Пример данных geoJSON:

{
  "type": "Feature",
  "properties": {},
  "geometry": {
    "type": "Polygon",
    "coordinates": [[[ -0.144195556640625,52.019120643633386],
        [-0.127716064453125,52.00052411347729],
        [-0.10848999023437499,52.01193653675363],
        [-0.12359619140625,52.02883848153626],
        [-0.144195556640625,52.019120643633386]]]
  }
},
{
  "type": "Feature",
  "properties": {},
  "geometry": {
    "type": "LineString",
    "coordinates": [[-0.196380615234375,52.11283076186275],
      [-0.1263427734375,52.07739600418385]]
      }
},
{
  "type": "Feature",
  "properties": {},
  "geometry": {
    "type": "Point",
    "coordinates": [-0.1641082763671875, 52.06051241654061]
  }
}

Спасибо

1 Ответ

0 голосов
/ 12 июня 2018

В любом случае можно ли указать схему как указание на то, что одна из схем координат должна совпадать?

UserDefinedTypes (больше не поддерживается), несмотря на все значения в Columnдолжны иметь одинаковую форму, поэтому вы не можете иметь одновременно array<array<array<double>>>, array<array<double>> и array<double>.

Вы можете полностью пропустить синтаксический анализ

validSchema = StructType([
    StructField("type", StringType()),
    StructField("geometry", StructType([
      StructField("coordinates", StringType()),
      StructField("type", StringType(), False)
    ]), False),
    StructField("properties", MapType(StringType(), StringType()))
])

, а затем разобрать его с помощью udf в три отдельных столбца:

from pyspark.sql.functions import udf
import json

@udf("struct<type: string, coordinates: struct<polygon: array<array<struct<lon: double, lat: double>>>, line: array<struct<lon: double, lat: double>>, point: struct<lon: double, lat: double>>>")
def parse(row):
    try:
        struct = json.loads(row["coordinates"])
        t = row["type"]
    except (TypeError, json.decoder.JSONDecodeError):
        pass 
    if t == "Polygon":
        return t, (struct, None, None)
    elif t == "LineString":
        return t, (None, struct, None)
    elif t == "Point":
        return t, (None, None, struct)

sdf.select(parse("geometry")).show(truncate=False)
# +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |parse(geometry)                                                                                                                                                                                                                  |
# +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# |[Polygon, [[[[-0.144195556640625, 52.019120643633386], [-0.127716064453125, 52.00052411347729], [-0.10848999023437499, 52.01193653675363], [-0.12359619140625, 52.02883848153626], [-0.144195556640625, 52.019120643633386]]],,]]|
# |[LineString, [, [[-0.196380615234375, 52.11283076186275], [-0.1263427734375, 52.07739600418385]],]]                                                                                                                              |
# |[Point, [,, [-0.1641082763671875, 52.06051241654061]]]                                                                                                                                                                           |
# +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
...