конвертирование документов json в поток байтов для кафки - PullRequest
1 голос
/ 02 ноября 2019

У меня проблемы с преобразованием моих данных в правильный поток байтов в искровой фрейм данных с правильной схемой.

Это мои данные. В одном файле есть строки этих json,

{"crime_id": "Crime Id", "original_crime_type_name": "Original Crime Type Name", "report_date": "Report Date", "call_date": "Call Date", "offense_date": "Offense Date", "call_time": "Call Time", "call_date_time": "Call Date Time", "disposition": "Disposition", "address": "Address", "city": "City", "state": "State", "agency_id": "Agency Id", "address_type": "Address Type", "common_location": "Common Location"}
{"crime_id": "192924201", "original_crime_type_name": "Fraud", "report_date": "2019-10-19T00:00:00.000", "call_date": "2019-10-19T00:00:00.000", "offense_date": "2019-10-19T00:00:00.000", "call_time": "23:55", "call_date_time": "2019-10-19T23:55:00.000", "disposition": "REP", "address": "2000 Block Of Mcallister St", "city": "San Francisco", "state": "CA", "agency_id": "1", "address_type": "Premise Address", "common_location": ""}

и я читаю файл и отправляю каждую строку, используя

    def generate_data(self):
        with open(self.input_file) as f:
            for line in f:
                message = self.dict_to_binary(line)
                self.send(self.topic, message)
                time.sleep(1)

    def dict_to_binary(self, json_dict):
        return json.dumps(json_dict).encode('utf-8')
kafka_df = df.selectExpr("CAST(value AS STRING)")

service_table = kafka_df\
  .select(psf.from_json(psf.col('value'), schema).alias("DF"))\
  .select("DF.*")

Так что kafka_df генерирует

|"{\"Crime Id\": \"Crime Id\", \"Original Crime Type Name\": \"Original Crime Type Name\", \"Report Date\": \"Report Date\", \"Call Date\": \"Call Date\", \"Offense Date\": \"Offense Date\", \"Call Time\": \"Call Time\", \"Call Date Time\": \"Call Date Time\", \"Disposition\": \"Disposition\", \"Address\": \"Address\", \"City\": \"City\", \"State\": \"State\", \"Agency Id\": \"Agency Id\", \"Address Type\": \"Address Type\", \"Common Location\": \"Common Location\"}\n"                                                        |
|"{\"Crime Id\": \"192924201\", \"Original Crime Type Name\": \"Fraud\", \"Report Date\": \"2019-10-19T00:00:00.000\", \"Call Date\": \"2019-10-19T00:00:00.000\", \"Offense Date\": \"2019-10-19T00:00:00.000\", \"Call Time\": \"23:55\", \"Call Date Time\": \"2019-10-19T23:55:00.000\", \"Disposition\": \"REP\", \"Address\": \"2000 Block Of Mcallister St\", \"City\": \"San Francisco\", \"State\": \"CA\", \"Agency Id\": \"1\", \"Address Type\": \"Premise Address\", \"Common Location\": \"\"}\n"                                  |

, но когда я пытаюсь утешитьservice_table, я получаю

+--------+------------------------+-----------+---------+------------+---------+--------------+-----------+-------+----+-----+---------+------------+---------------+
|crime_id|original_crime_type_name|report_date|call_date|offense_date|call_time|call_date_time|disposition|address|city|state|agency_id|address_type|common_location|
+--------+------------------------+-----------+---------+------------+---------+--------------+-----------+-------+----+-----+---------+------------+---------------+
|null    |null                    |null       |null     |null        |null     |null          |null       |null   |null|null |null     |null        |null           |
|null    |null                    |null       |null     |null        |null     |null          |null       |null   |null|null |null     |null        |null           |
|null    |null                    |null       |null     |null        |null     |null          |null       |null   |null|null |null     |null        |null           |

Схема полностью в StringType(), и я не уверен, почему преобразование не происходит правильно ... любая помощь?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...