В моей облачной функции Google (Python 3.7 Runtime) я создал функцию, которая пытается загрузить все CSV-файлы из хранилища Google в фрейм данных pandas (df).Оказавшись в кадре данных, я собирался выполнить небольшую работу над ETL, а затем преобразовать его обратно в один большой файл .csv для сохранения в другое ведро.
Проблема, с которой я сталкиваюсь, заключается в том, что я начинаю читатьобъекты (преобразованные в строки с помощью file.download_as_string ()) в read_csv (), я получаю ошибку, связанную с IO.StringIO (TypeError: initial_value должно быть str или None, а не байты)
В пределах read_csv (io.String.IO (file_contents) ....), это как-то связано с тем, где я разместил метод io.StringIO?Может ли кто-нибудь помочь мне исправить эту ошибку?
def stage1slemonthly(data, context, source_bucket = 'my_source_bucket',
destination_bucket = 'gs://my destination_bucket'):
from google.cloud import storage
import pandas as pd
import pyspark
from pyspark.sql import SQLContext
import io
storage_client = storage.Client()
# source_bucket = data['bucket']
# source_file = data['name']
source_bucket = storage_client.bucket(source_bucket)
# load in the col names
col_names = ["Customer_Country_Number", "Customer_Name", "Exclude",
"SAP_Product_Name", "CP_Sku_Code", "Exclude", "UPC_Unit",
"UPC_Case", "Colgate_Month_Year", "Total_Cases",
"Promoted_Cases", "Non_Promoted_Cases",
"Planned_Non_Promoted_Cases", "Exclude",
"Lead_Measure", "Tons", "Pieces", "Liters",
"Tons_Conv_Revenue", "Volume_POS_Units", "Scan_Volume",
"WWhdrl_Volume", "Exclude", "Exclude", "Exclude", "Exclude",
"Exclude", "Exclude", "Exclude", "Exclude", "Investment_Buy",
"Exclude", "Exclude", "Gross_Sales", "Claim_Sales",
"Adjusted_Gross_Sales", "Exclude", "Exclude",
"Consumer_Investment", "Consumer_Allowance",
"Special_Pack_FG", "Coupons", "Contests_Offers",
"Consumer_Price_Reduction", "Permanent_Price_Reduction",
"Temporary_Price_Reduction", "TPR_Off_Invoice", "TPR_Scan",
"TPR_WWdrwl_Exfact", "Every_Day_Low_Price", "Closeouts",
"Inventory_Price_Reduction", "Exclude", "Customer_Investment",
"Prompt_Payment", "Efficiency_Drivers", "Efficient_Logistics",
"Efficient_Management", "Business_Builders_Direct", "Assortment",
"Customer_Promotions","Customer_Promotions_Terms",
"Customer_Promotions_Fixed", "Growth_Direct",
"New_Product_Incentives", "Free_Goods_Direct",
"Shopper_Marketing", "Business_Builders_Indirect",
"Middleman_Performance", "Middleman_Infrastructure",
"Growth_Indirect", "Indirect_Retailer_Investments",
"Free_Goods_Indirect", "Other_Customer_Investments",
"Product_Listing_Allowances", "Non_Performance_Trade_Payments",
"Exclude", "Variable_Rebate_Adjustment",
"Overlapping_OI_Adjustment", "Fixed_Accruals",
"Variable_Accruals", "Total_Accruals", "Gross_To_Net",
"Invoiced_Sales", "Exclude", "Exclude", "Net_Sales",
"Exclude", "Exclude", "Exclude", "Exclude", "Exclude",
"Exclude", "Exclude", "Exclude", "Exclude",
"Total_Variable_Cost", "Margin", "Exclude"]
df = pd.DataFrame(columns=[col_names])
for file in list(source_bucket.list_blobs()):
file_contents = file.download_as_string()
df = df.append(pd.read_csv(io.StringIO(file_contents), header=None, names=[col_names]))
df = df.reset_index(drop=True)
# do ETL work here in future
sc = pyspark.SparkContext.getOrCreate()
sqlCtx = SQLContext(sc)
sparkDf = sqlCtx.createDataFrame(df)
sparkDf.coalesce(1).write.option("header", "true").csv(destination_bucket)
Когда я ее запускаю, я получаю следующее сообщение об ошибке ...
Traceback (последний последний вызов): Файл "/env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py ", строка 383, в файле run_background_function _function_handler.invoke_user_function (event_object) файл" /env/local/lib/python3.7/site-packages / google / cloud / functions / worker.py ", строка 217, в invoke_user_function возвращает call_user_function (request_or_event) Файл" /env/local/lib/python3.7/site-packages/google/cloud/functions/worker.py ", строка 214, в call_user_function event_context.Context (** request_or_event.context)) Файл" /user_code/main.py ", строка 56, в stage1slemonthly df = df.append (pd.read_csv (io.StringIO (file_contents)), header = None, names = [col_names])) TypeError: initial_value должно быть str или None, а не байтами