У меня проблемы с преобразованием моих данных в правильный поток байтов в искровой фрейм данных с правильной схемой.
Это мои данные. В одном файле есть строки этих json,
{"crime_id": "Crime Id", "original_crime_type_name": "Original Crime Type Name", "report_date": "Report Date", "call_date": "Call Date", "offense_date": "Offense Date", "call_time": "Call Time", "call_date_time": "Call Date Time", "disposition": "Disposition", "address": "Address", "city": "City", "state": "State", "agency_id": "Agency Id", "address_type": "Address Type", "common_location": "Common Location"}
{"crime_id": "192924201", "original_crime_type_name": "Fraud", "report_date": "2019-10-19T00:00:00.000", "call_date": "2019-10-19T00:00:00.000", "offense_date": "2019-10-19T00:00:00.000", "call_time": "23:55", "call_date_time": "2019-10-19T23:55:00.000", "disposition": "REP", "address": "2000 Block Of Mcallister St", "city": "San Francisco", "state": "CA", "agency_id": "1", "address_type": "Premise Address", "common_location": ""}
и я читаю файл и отправляю каждую строку, используя
def generate_data(self):
with open(self.input_file) as f:
for line in f:
message = self.dict_to_binary(line)
self.send(self.topic, message)
time.sleep(1)
def dict_to_binary(self, json_dict):
return json.dumps(json_dict).encode('utf-8')
kafka_df = df.selectExpr("CAST(value AS STRING)")
service_table = kafka_df\
.select(psf.from_json(psf.col('value'), schema).alias("DF"))\
.select("DF.*")
Так что kafka_df генерирует
|"{\"Crime Id\": \"Crime Id\", \"Original Crime Type Name\": \"Original Crime Type Name\", \"Report Date\": \"Report Date\", \"Call Date\": \"Call Date\", \"Offense Date\": \"Offense Date\", \"Call Time\": \"Call Time\", \"Call Date Time\": \"Call Date Time\", \"Disposition\": \"Disposition\", \"Address\": \"Address\", \"City\": \"City\", \"State\": \"State\", \"Agency Id\": \"Agency Id\", \"Address Type\": \"Address Type\", \"Common Location\": \"Common Location\"}\n" |
|"{\"Crime Id\": \"192924201\", \"Original Crime Type Name\": \"Fraud\", \"Report Date\": \"2019-10-19T00:00:00.000\", \"Call Date\": \"2019-10-19T00:00:00.000\", \"Offense Date\": \"2019-10-19T00:00:00.000\", \"Call Time\": \"23:55\", \"Call Date Time\": \"2019-10-19T23:55:00.000\", \"Disposition\": \"REP\", \"Address\": \"2000 Block Of Mcallister St\", \"City\": \"San Francisco\", \"State\": \"CA\", \"Agency Id\": \"1\", \"Address Type\": \"Premise Address\", \"Common Location\": \"\"}\n" |
, но когда я пытаюсь утешитьservice_table
, я получаю
+--------+------------------------+-----------+---------+------------+---------+--------------+-----------+-------+----+-----+---------+------------+---------------+
|crime_id|original_crime_type_name|report_date|call_date|offense_date|call_time|call_date_time|disposition|address|city|state|agency_id|address_type|common_location|
+--------+------------------------+-----------+---------+------------+---------+--------------+-----------+-------+----+-----+---------+------------+---------------+
|null |null |null |null |null |null |null |null |null |null|null |null |null |null |
|null |null |null |null |null |null |null |null |null |null|null |null |null |null |
|null |null |null |null |null |null |null |null |null |null|null |null |null |null |
Схема полностью в StringType()
, и я не уверен, почему преобразование не происходит правильно ... любая помощь?