Я пытаюсь определить часовой пояс в PySpark, учитывая долготу и широту события. Я наткнулся на библиотеку timezonefinder
, которая работает локально. Я обернул его в пользовательскую функцию, пытаясь использовать его в качестве логического устройства часового пояса.
def get_timezone(longitude, latitude):
from timezonefinder import TimezoneFinder
tzf = TimezoneFinder()
return tzf.timezone_at(lng=longitude, lat=latitude)
udf_timezone = F.udf(get_timezone, StringType())
df = sqlContext.read.parquet(INPUT)
df.withColumn("local_timezone", udf_timezone(df.longitude, df.latitude))\
.write.parquet(OUTPUT)
Когда я работаю на одном узле, этот код работает. Однако при параллельной работе я получаю следующую ошибку:
File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1525907011747_0007/container_1525907011747_0007_01_000062/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1525907011747_0007/container_1525907011747_0007_01_000062/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1525907011747_0007/container_1525907011747_0007_01_000062/pyspark.zip/pyspark/worker.py", line 104, in <lambda>
func = lambda _, it: map(mapper, it)
File "<string>", line 1, in <lambda>
File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1525907011747_0007/container_1525907011747_0007_01_000062/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
return lambda *a: f(*a)
File "/tmp/c95422912bfb4079b64b88427991552a/enrich_data.py", line 64, in get_timezone
File "/opt/conda/lib/python2.7/site-packages/timezonefinder/__init__.py", line 3, in <module>
from .timezonefinder import TimezoneFinder
File "/opt/conda/lib/python2.7/site-packages/timezonefinder/timezonefinder.py", line 59, in <module>
from .helpers_numba import coord2int, int2coord, distance_to_polygon_exact, distance_to_polygon, inside_polygon, \
File "/opt/conda/lib/python2.7/site-packages/timezonefinder/helpers_numba.py", line 17, in <module>
@jit(b1(i4, i4, i4[:, :]), nopython=True, cache=True)
File "/opt/conda/lib/python2.7/site-packages/numba/decorators.py", line 191, in wrapper
disp.enable_caching()
File "/opt/conda/lib/python2.7/site-packages/numba/dispatcher.py", line 529, in enable_caching
self._cache = FunctionCache(self.py_func)
File "/opt/conda/lib/python2.7/site-packages/numba/caching.py", line 614, in __init__
self._impl = self._impl_class(py_func)
File "/opt/conda/lib/python2.7/site-packages/numba/caching.py", line 349, in __init__
"for file %r" % (qualname, source_path))
RuntimeError: cannot cache function 'inside_polygon': no locator available for file '/opt/conda/lib/python2.7/site-packages/timezonefinder/helpers_numba.py'
Я могу импортировать библиотеку локально на узлах, где я получил ошибку.
Любое решение в этом направлении будет оценено:
- Есть ли нативный Spark для выполнения задачи?
- Есть ли другой способ загрузить библиотеку?
- Есть ли способ избежать кэширования
numba
делает?