В любом случае я могу преобразовать данные значения в фактические имена столбцов в StructuredStreaming от Kafka? - PullRequest
0 голосов
/ 20 апреля 2019

У меня есть CSV-файл, в котором есть столбцы, и для целей тестирования я помещаю его вручную в Kafka, а оттуда я читаю его в Spark и применяю некоторый анализ, и я делаю вывод на консоль для целей тестирования. Теперь я понимаю, что данные CSV передаются в виде значения в структурированном потоке, для которого я преобразовал их в строку. Мое требование, если я могу преобразовать данные значения в фактические столбцы. В CSV-файле есть сотни столбцов, но я смотрю только на два конкретных столбца "SERVICE_NAME8" и "_raw"

Я использую spark.sql для извлечения этих столбцов, когда я читаю CSV-файл из пути, но теперь я использую структурированную потоковую передачу. Я не уверен, смогу ли я извлечь эти конкретные столбцы в качестве нового кадра данных и применить мой анализ после этого

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "10.160.172.45:9092, 10.160.172.46:9092, 10.160.172.100:9092")
      .option("subscribe", "TOPIC_WITH_COMP_P2_R2, TOPIC_WITH_COMP_P2_R2.DIT, TOPIC_WITHOUT_COMP_P2_R2.DIT")
      .load()

    val dfs = df.selectExpr("CAST(value AS STRING)").toDF()

    val data =dfs.withColumn("splitted", split($"value", "/"))
      .select($"splitted".getItem(4).alias("region"),$"splitted".getItem(5).alias("service"),col("value"))
      .withColumn("service_type", regexp_extract($"service", """.*(Inbound|Outbound|Outound).*""",1))
      .withColumn("region_type", concat(
        when(col("region").isNotNull,col("region")).otherwise(lit("null")), lit(" "),
        when(col("service").isNotNull,col("service_type")).otherwise(lit("null"))))

    val extractedDF = data.filter(
      col("region").isNotNull &&
        col("service").isNotNull &&
        col("value").isNotNull &&
        col("service_type").isNotNull &&
        col("region_type").isNotNull)
      .filter("region != ''")
      .filter("service != ''")
      .filter("value != ''")
      .filter("service_type != ''")
      .filter("region_type != ''")

val query = extractedDF
.writeStream
.format("console")
.outputMode("append")
.trigger(ProcessingTime("20 seconds"))
.start()

После val dfs = df.selectExpr ("CAST (value AS STRING)"). ToDF () мне как-то нужно извлечь только два столбца "SERVICE_NAME8" & "_raw", а синтаксический анализ должен сделать все остальное и произвести вывод

1 Ответ

0 голосов
/ 20 апреля 2019

В Spark структурированная потоковая передача быстрый пример, который вы могли видеть, что
df.as[String].map(_.split("/")) должен преобразовать поток в тот же data, что и в коде spark.sql. Далее вы можете извлечь только нужные столбцы и обработать его. Например
data.map(line=>(line[SERVICE_NAME_COLUMN_INDEX], line[RAW_COLUMN_INDEX]))
получит Tuple из двух столбцов для каждой строки.
Обратите внимание, это всего лишь пример. Я не запускаю это. Также я полагаю, что Tuple не лучшее решение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...