У меня есть ниже DataFrame, который имеет схему и данные. Теперь я просто записываю эти данные в Kafka topi c, который vehicleData
.
Data
+---------------+------------+------+------+-------------+------+
|Registration_No|Plate_Number| Maker| Model|Vehicle_Color|Status|
+---------------+------------+------+------+-------------+------+
| LER-15A-9681| LER9681|Suzuki|Mehran| RED| 0|
| LEV-15-7044| LEV7044| Honda| Civic| GREEN| 0|
| LEC-15-1946| LEC1946| Honda| Civic| WHITE| 0|
Schema is:
root
|-- Registration_No: string (nullable = true)
|-- Plate_Number: string (nullable = true)
|-- Maker: string (nullable = true)
|-- Model: string (nullable = true)
|-- Vehicle_Color: string (nullable = true)
|-- Status: string (nullable = true)
Code is:
raw_data.selectExpr("CAST(Registration_No AS STRING)", "CAST(Plate_Number AS STRING)",
"CAST(Maker AS STRING)", "CAST(Model AS STRING)",
"CAST(Vehicle_Color AS STRING)", "CAST(Status AS STRING)").write\
.format("kafka") \
.option("kafka.bootstrap.servers", "127.0.0.1:9092")\
.option("topic","vehicleData") \
.save()
Ошибка, которую я получаю:
Traceback (most recent call last):
File "/Users/saeed.butt/PycharmProjects/untitled4/Main.py", line 52, in <module>
.option("topic","vehicleData") \
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 737, in save
self._jwrite.save()
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'
сервер kafka работает на localhost:9092
.