Как указать shema при создании фрейма данных из источника данных сокетов? - PullRequest
0 голосов
/ 24 октября 2019

Я новичок в искре. Я передаю файлы журнала в следующем формате на порт 9999 -

2019-09-15 23:45:37,370|10.1.198.43|splunk|headObject|splunk|splunk|160|0|55|246|461|1322|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fa-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,379|10.1.198.53|splunk|getObject|splunk|splunk|160|0|55|246|461|1567|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fc-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:37,430|10.1.198.53|splunk|headObject|splunk|splunk|160|0|55|246|461|1329|_introspection%2Fdma%2F27%2F99%2F103%7E955F0453-052A-4B49-BC09-8259A2B608E4%2F5C218CC9-77D3-4BBE-AD24-A5FB31CC56F2_DM_SplunkforPaloAltoNetworks_pan_wildfire_report%2Freceipt.json|404|2d86e6fe-c7e4-1c5b-8d12-54ab3a911327|0|NoSuchKey|
2019-09-15 23:45:38,545|10.29.2.5||unknown|||0|0|0|250|250|223||400|2d86e700-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|
2019-09-15 23:45:38,614|10.29.2.6||unknown|||0|0|0|250|250|187||400|2d86e702-c7e4-1c5b-8d12-54ab3a911327|0|InvalidBucketName|

И мое потоковое кодирование, которое дало мне фрейм данных: -

val linesDF: DataFrame = spark
      .readStream
      .format("socket")
      .option("host", "127.0.0.1")
      .option("port", 9999)
      .option("delimiter", "|")
      .schema(schema1)
      .load()

Но я получаю ошибкучто я не могу указать схему с socket источником. Как читать эти данные?

Моя схема выглядит следующим образом -

val schemaString = "dttm|ip|bktownr|oper|bktnm|usr|" +
      "reqhdr|reqbd|reshdr|resbd|totsize|" +
      "duration|objnm|httpstts|s3reqid|etag|errcd|srcbkt"

    val schema1 = StructType(
      schemaString
        .split('|')
        .map(fieldName => StructField(fieldName, StringType, true))
    )

После удаления опции я знаю, что могу читать данные, но они читаются в одном столбце с именем value. Я провел эксперименты по загрузке данных в статический фрейм данных и предоставил там схему. Но это не работает в этом случае, когда я на самом деле хочу преобразовать процесс в поток, где данные будут передаваться через сокет 9999.

Есть ли другой способ, которого я пропускаю?

1 Ответ

0 голосов
/ 25 октября 2019

Если вы пытаетесь читать журналы в режиме структурированной потоковой передачи spark, вы можете напрямую указать каталог файлов журналов, и spark также будет читать его в потоке вместе с новыми файлами. Вот как я решил эту проблему -

val rawData = spark
      .readStream
      .format("csv")
      .option("delimiter", "|")
      .option("header", "true")
      .schema(customSchema)
      .load("/Users/username/logs/cloud-logs")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...