Запись DataFrame в Kafka topi c с использованием Pyspark - PullRequest
0 голосов
/ 11 марта 2020

У меня есть ниже DataFrame, который имеет схему и данные. Теперь я просто записываю эти данные в Kafka topi c, который vehicleData.

Data
+---------------+------------+------+------+-------------+------+
|Registration_No|Plate_Number| Maker| Model|Vehicle_Color|Status|
+---------------+------------+------+------+-------------+------+
|   LER-15A-9681|     LER9681|Suzuki|Mehran|          RED|     0|
|    LEV-15-7044|     LEV7044| Honda| Civic|        GREEN|     0|
|    LEC-15-1946|     LEC1946| Honda| Civic|        WHITE|     0|

Schema is:
root
 |-- Registration_No: string (nullable = true)
 |-- Plate_Number: string (nullable = true)
 |-- Maker: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle_Color: string (nullable = true)
 |-- Status: string (nullable = true)


Code is:
raw_data.selectExpr("CAST(Registration_No AS STRING)", "CAST(Plate_Number AS STRING)",
                         "CAST(Maker AS STRING)", "CAST(Model AS STRING)",
                         "CAST(Vehicle_Color AS STRING)", "CAST(Status AS STRING)").write\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")\
    .option("topic","vehicleData") \
    .save()

Ошибка, которую я получаю:

Traceback (most recent call last):
  File "/Users/saeed.butt/PycharmProjects/untitled4/Main.py", line 52, in <module>
    .option("topic","vehicleData") \
  File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 737, in save
    self._jwrite.save()
  File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'

сервер kafka работает на localhost:9092.

1 Ответ

0 голосов
/ 12 марта 2020

Фрейм данных должен иметь столбец значений для записи в kafka. Добавьте столбец с именем «value» и запишите его в kafka.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...