Внедрение данных потока Kafka из библиотеки C # в друид - PullRequest
1 голос
/ 02 июля 2019

В течение последних нескольких дней я пытался ввести некоторые данные из Кафки в Друид, и до сих пор с треском проваливаюсь.

Я создаю некоторые базовые данные с помощью библиотеки C # в Confluent:

using (var p = new ProducerBuilder<string, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync("Test", new Message<string, string> { Key = "Random", Value = i.ToString(), Timestamp = new Timestamp(DateTime.UtcNow, TimestampType.CreateTime) });
                Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                p.Flush(TimeSpan.FromSeconds(10));

            }

И это добавляет данные успешно. Но когда я пытаюсь передать данные в Друид, после создания Задачи ничего не происходит, и это говорит мне, что данные не разбираются.

Это моя спецификация приема пищи:

    {
  "type": "kafka",
  "dataSchema": {
    "dataSource": "Test",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "time",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [
            {
              "name": "Random",
              "type": "string"
            }
          ]
        }
      }
    },
    "metricsSpec": [],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "DAY",
      "queryGranularity": {
        "type": "none"
      },
      "rollup": true,
      "intervals": null
    },
    "transformSpec": {
      "filter": null,
      "transforms": []
    }
  },
  "tuningConfig": {
    "type": "kafka",
    "maxRowsInMemory": 1000000,
    "maxBytesInMemory": 0,
    "maxRowsPerSegment": 5000000,
    "maxTotalRows": null,
    "intermediatePersistPeriod": "PT10M",
    "maxPendingPersists": 0,
    "indexSpec": {
      "bitmap": {
        "type": "concise"
      },
      "dimensionCompression": "lz4",
      "metricCompression": "lz4",
      "longEncoding": "longs"
    },
    "buildV9Directly": true,
    "reportParseExceptions": false,
    "handoffConditionTimeout": 0,
    "resetOffsetAutomatically": false,
    "segmentWriteOutMediumFactory": null,
    "workerThreads": null,
    "chatThreads": null,
    "chatRetries": 8,
    "httpTimeout": "PT10S",
    "shutdownTimeout": "PT80S",
    "offsetFetchPeriod": "PT30S",
    "intermediateHandoffPeriod": "P2147483647D",
    "logParseExceptions": false,
    "maxParseExceptions": 2147483647,
    "maxSavedParseExceptions": 0,
    "skipSequenceNumberAvailabilityCheck": false
  },
  "ioConfig": {
    "topic": "Test",
    "replicas": 1,
    "taskCount": 1,
    "taskDuration": "PT600S",
    "consumerProperties": {
      "bootstrap.servers": "localhost:9092"
    },
    "pollTimeout": 100,
    "startDelay": "PT5S",
    "period": "PT30S",
    "useEarliestOffset": false,
    "completionTimeout": "PT1200S",
    "lateMessageRejectionPeriod": null,
    "earlyMessageRejectionPeriod": null,
    "stream": "Test",
    "useEarliestSequenceNumber": false
  },
  "context": null,
  "suspended": false
}

Надеюсь, кто-то может заметить ошибку там.

Заранее спасибо!

EDIT:

Журнал, созданный заданием загрузки

{
  "0": {
    "index_kafka_Test_4b0ecd3ab842a29_bebmfiod": {
      "movingAverages": {
        "buildSegments": {
          "5m": {
            "processed": 0,
            "unparseable": 1,
            "thrownAway": 0,
            "processedWithError": 0
          },
          "15m": {
            "processed": 0,
            "unparseable": 1,
            "thrownAway": 0,
            "processedWithError": 0
          },
          "1m": {
            "processed": 0,
            "unparseable": 1,
            "thrownAway": 0,
            "processedWithError": 0
          }
        }
      },
      "totals": {
        "buildSegments": {
          "processed": 0,
          "processedWithError": 0,
          "thrownAway": 0,
          "unparseable": 8
        }
      }
    }
  }
}
...