Я застрял с довольно любопытной проблемой. У меня есть искровой RDD (со значением ключа), и я хочу сохранить каждую запись RDD в отдельном файле в хранилище BLOB-объектов Azure. Код следующий:
from azure.storage.blob import (
BlobServiceClient,
BlobClient
)
def save_blob(kv):
CONNECTION_STRING = 'DefaultEndpointsProtocol=https;AccountName=example-account;AccountKey=very-very-secret;EndpointSuffix=core.windows.net'
CLIENT = 'client'
INDEX_DIR = f'{CLIENT}/index/'
CONTAINER_NAME = 'rawdata'
blob_service_client = BlobServiceClient.from_connection_string(CONNECTION_STRING)
blob_client = blob_service_client.get_blob_client(CONTAINER_NAME, f'{INDEX_DIR}{kv[0]}.json')
return blob_client.upload_blob(kv[1], blob_type="BlockBlob")
Я переместил все переменные в их необработанных строках в функцию save_blob (), чтобы убедиться, что нет никаких проблем с переменными, которые неизвестны рабочим. Если я правильно понимаю, все значения должны быть установлены здесь.
Выполнение следующего прекрасно работает, создается BLOB-файл.
kv = (0, '{"id": "whatever"}')
save_blob(kv)
Однако, как только я запускаю этона моем СДР, в котором есть данные в форме выключен TUPLE[int, DICT[str, DICT[str, any]]]
- выполняется следующий код:
my_rdd.foreach(lambda kv: save_blob(kv))
Выдает ошибки проверки подлинности следующей формы:
azure.storage.blob._generated.models._models_py3.StorageErrorException: Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature.
....
azure.core.exceptions.ClientAuthenticationError: Server failed to authenticate the request.
Make sure the value of Authorization header is formed correctly including the signature.
RequestId:5a6d54c5-701e-00fa-46b8-953d31000000
Time:2019-11-07T22:11:57.3636552Z
ErrorCode:AuthenticationFailed
Error:None
AuthenticationErrorDetail:The MAC signature found in the HTTP request '0p+pmV5/FyugC0RJKnKwn+DwBOMEFXJ6BuGMiVNU38k=' is not the same as any computed signature. Server used following string to sign: 'PUT
971
application/octet-stream
*
x-ms-blob-type:BlockBlob
x-ms-client-request-id:9cffe814-01ab-11ea-aa40-00163e786b5f
x-ms-date:Thu, 07 Nov 2019 22:11:57 GMT
x-ms-version:2019-02-02
/example-container/path/client_20d%2Findex%2F0.json'.
Кто-нибудь знает, почему яКажется, что в состоянии выполнить код, если это не происходит в RDD? Я в растерянности ...
Спасибо! -Tom
PS: преобразование RDD в DataFrame с использованием rdd.toDF("column1", "column2")
с последующим использованием df.write.PartitionBy("column1").json(...)
не работает, так как вложенные дикты, похоже, не могут быть правильно преобразованы.