Pyspark Inferring Часовой пояс по местоположению - PullRequest
0 голосов
/ 10 мая 2018

Я пытаюсь определить часовой пояс в PySpark, учитывая долготу и широту события. Я наткнулся на библиотеку timezonefinder, которая работает локально. Я обернул его в пользовательскую функцию, пытаясь использовать его в качестве логического устройства часового пояса.

def get_timezone(longitude, latitude):
    from timezonefinder import TimezoneFinder
    tzf = TimezoneFinder()
    return tzf.timezone_at(lng=longitude, lat=latitude)

udf_timezone = F.udf(get_timezone, StringType())

df = sqlContext.read.parquet(INPUT)
df.withColumn("local_timezone", udf_timezone(df.longitude, df.latitude))\
  .write.parquet(OUTPUT)

Когда я работаю на одном узле, этот код работает. Однако при параллельной работе я получаю следующую ошибку:

  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1525907011747_0007/container_1525907011747_0007_01_000062/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1525907011747_0007/container_1525907011747_0007_01_000062/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1525907011747_0007/container_1525907011747_0007_01_000062/pyspark.zip/pyspark/worker.py", line 104, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "<string>", line 1, in <lambda>
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1525907011747_0007/container_1525907011747_0007_01_000062/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
    return lambda *a: f(*a)
  File "/tmp/c95422912bfb4079b64b88427991552a/enrich_data.py", line 64, in  get_timezone
  File "/opt/conda/lib/python2.7/site-packages/timezonefinder/__init__.py", line 3, in <module>
    from .timezonefinder import TimezoneFinder
  File "/opt/conda/lib/python2.7/site-packages/timezonefinder/timezonefinder.py", line 59, in <module>
    from .helpers_numba import coord2int, int2coord, distance_to_polygon_exact, distance_to_polygon, inside_polygon, \
  File "/opt/conda/lib/python2.7/site-packages/timezonefinder/helpers_numba.py", line 17, in <module>
    @jit(b1(i4, i4, i4[:, :]), nopython=True, cache=True)
  File "/opt/conda/lib/python2.7/site-packages/numba/decorators.py", line 191, in wrapper
    disp.enable_caching()
  File "/opt/conda/lib/python2.7/site-packages/numba/dispatcher.py", line 529, in enable_caching
    self._cache = FunctionCache(self.py_func)
  File "/opt/conda/lib/python2.7/site-packages/numba/caching.py", line 614, in __init__
    self._impl = self._impl_class(py_func)
  File "/opt/conda/lib/python2.7/site-packages/numba/caching.py", line 349, in __init__
    "for file %r" % (qualname, source_path))
RuntimeError: cannot cache function 'inside_polygon': no locator available for file '/opt/conda/lib/python2.7/site-packages/timezonefinder/helpers_numba.py'

Я могу импортировать библиотеку локально на узлах, где я получил ошибку. Любое решение в этом направлении будет оценено:

  • Есть ли нативный Spark для выполнения задачи?
  • Есть ли другой способ загрузить библиотеку?
  • Есть ли способ избежать кэширования numba делает?

1 Ответ

0 голосов
/ 27 июля 2018

В конце концов это было решено путем полного отказа от timezonefinder и использования вместо этого набора гео-пространственных данных часового пояса из timezone-boundary-builder при запросе с помощью magellan, гео -спространственная библиотека запросов sql для spark.

Единственное предупреждение, которое я получил, было то, что Point и другие объекты в библиотеке не были упакованы для Python. В итоге я написал свою собственную функцию scala для сопоставления часовых поясов и отбросил объекты с magellan перед возвратом кадра данных.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...