Я новичок в искре. Я передаю файлы журнала в следующем формате на порт 9999
-
2019-09-15 23:45:37,370|10.1.198.43|splunk|headObject|splunk|splunk|160|0|55|246|461|1322|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fa-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,379|10.1.198.53|splunk|getObject|splunk|splunk|160|0|55|246|461|1567|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fc-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,430|10.1.198.53|splunk|headObject|splunk|splunk|160|0|55|246|461|1329|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fe-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:38,545|10.29.2.5||unknown|||0|0|0|250|250|223||400|2d86e700-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|
2019-09-15 23:45:38,614|10.29.2.6||unknown|||0|0|0|250|250|187||400|2d86e702-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|
И мое потоковое кодирование, которое дало мне фрейм данных: -
val linesDF: DataFrame = spark
.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.option("delimiter", "|")
.schema(schema1)
.load()
Но я получаю ошибкучто я не могу указать схему с socket
источником. Как читать эти данные?
Моя схема выглядит следующим образом -
val schemaString = "dttm|ip|bktownr|oper|bktnm|usr|" +
"reqhdr|reqbd|reshdr|resbd|totsize|" +
"duration|objnm|httpstts|s3reqid|etag|errcd|srcbkt"
val schema1 = StructType(
schemaString
.split('|')
.map(fieldName => StructField(fieldName, StringType, true))
)
После удаления опции я знаю, что могу читать данные, но они читаются в одном столбце с именем value
. Я провел эксперименты по загрузке данных в статический фрейм данных и предоставил там схему. Но это не работает в этом случае, когда я на самом деле хочу преобразовать процесс в поток, где данные будут передаваться через сокет 9999
.
Есть ли другой способ, которого я пропускаю?