преобразовать данные в фабрику данных Azure, используя блоки данных Python - PullRequest
0 голосов
/ 25 июня 2019

У меня есть задача преобразовать и объединить миллионы одного JSON-файла в БОЛЬШИЕ CSV-файлы.

Операция будет очень простой, если использовать операцию копирования и сопоставления схем, которые я уже тестировал, проблема заключается в том, чточто огромное количество файлов имеет неправильный формат JSON.

Я знаю, в чем заключается ошибка, и исправление тоже очень простое, я подумал, что мог бы использовать действие кирпича Python Data, чтобы исправить строку, а затем передатьвывод к операции копирования, которая может объединить записи в большой файл CSV.

Я имею в виду нечто подобное, я не уверен, что это правильный способ решения этой задачи.Я не знаю, использовать выходные данные Copy Activy в операции Data Brick enter image description here

Ответы [ 2 ]

1 голос
/ 28 июня 2019

Звучит так, как будто вы хотите преобразовать большое количество отдельных файлов JSON с помощью фабрики данных Azure, но теперь она не поддерживается в Azure, как сказал @KamilNowinski. Однако теперь, когда вы использовали блоки данных Azure, написать простой скрипт на Python, чтобы сделать то же самое, стало проще. Поэтому обходным решением является непосредственное использование Azure Storage SDK и пакета pandas Python, чтобы сделать это с помощью нескольких шагов для блоков данных Azure.

  1. Может быть, все эти файлы JSON находятся в контейнере хранилища BLOB-объектов Azure, поэтому вам нужно перечислить их в контейнере через list_blob_names и сгенерировать их URL-адреса с токеном sas для панд read_json функция, код как показано ниже.

    from azure.storage.blob.baseblobservice import BaseBlobService
    from azure.storage.blob import ContainerPermissions
    from datetime import datetime, timedelta
    
    account_name = '<your account name>'
    account_key = '<your account key>'
    container_name = '<your container name>'
    
    service = BaseBlobService(account_name=account_name, account_key=account_key)
    token = service.generate_container_shared_access_signature(container_name, permission=ContainerPermissions.READ, expiry=datetime.utcnow() + timedelta(hours=1),)
    
    blob_names = service.list_blob_names(container_name)
    blob_urls_with_token = (f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{token}" for blob_name in blob_names)
    
    #print(list(blob_urls_with_token))
    
  2. Затем вы можете прочитать эти JSON-файлы напрямую из BLOB-объектов с помощью функции read_json, чтобы создать панду Dataframe.

    import pandas as pd
    
    for blob_url_with_token in blob_urls_with_token:
        df = pd.read_json(blob_url_with_token)
    

    Даже если вы хотите объединить их в большой файл CSV, вы можете сначала объединить их в большой Dataframe с помощью функций pandas, перечисленных в Combining / joining / merging как append.

  3. Для записи фрейма данных в CSV-файл, я думаю, это очень просто с помощью функции to_csv. Или вы можете преобразовать фрейм данных pandas в фрейм данных PySpark для блоков данных Azure, как показано ниже:

    from pyspark.sql import SQLContext
    from pyspark import SparkContext
    
    sc = SparkContext()
    sqlContest = SQLContext(sc)
    spark_df = sqlContest.createDataFrame(df)
    

Итак, все, что вы хотите сделать, это просто. А если вы хотите запланировать сценарий как записную книжку в Azure Databricks, вы можете обратиться к официальному документу Jobs для запуска заданий Spark.

Надеюсь, это поможет.

0 голосов
/ 27 июня 2019

Скопируйте файл JSON в хранилище (например, BLOB), и вы сможете получить доступ к хранилищу из Databricks. Затем вы можете исправить файл с помощью Python и даже преобразовать его в нужный формат, запустив кластер.

Итак, в разделе «Копирование данных» выполните копирование файлов в BLOB, если у вас их еще нет.

...