В течение последних нескольких дней я пытался ввести некоторые данные из Кафки в Друид, и до сих пор с треском проваливаюсь.
Я создаю некоторые базовые данные с помощью библиотеки C # в Confluent:
using (var p = new ProducerBuilder<string, string>(config).Build())
{
try
{
var dr = await p.ProduceAsync("Test", new Message<string, string> { Key = "Random", Value = i.ToString(), Timestamp = new Timestamp(DateTime.UtcNow, TimestampType.CreateTime) });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
p.Flush(TimeSpan.FromSeconds(10));
}
И это добавляет данные успешно. Но когда я пытаюсь передать данные в Друид, после создания Задачи ничего не происходит, и это говорит мне, что данные не разбираются.
Это моя спецификация приема пищи:
{
"type": "kafka",
"dataSchema": {
"dataSource": "Test",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
{
"name": "Random",
"type": "string"
}
]
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": {
"type": "none"
},
"rollup": true,
"intervals": null
},
"transformSpec": {
"filter": null,
"transforms": []
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsInMemory": 1000000,
"maxBytesInMemory": 0,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"buildV9Directly": true,
"reportParseExceptions": false,
"handoffConditionTimeout": 0,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": false,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"skipSequenceNumberAvailabilityCheck": false
},
"ioConfig": {
"topic": "Test",
"replicas": 1,
"taskCount": 1,
"taskDuration": "PT600S",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": false,
"completionTimeout": "PT1200S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"stream": "Test",
"useEarliestSequenceNumber": false
},
"context": null,
"suspended": false
}
Надеюсь, кто-то может заметить ошибку там.
Заранее спасибо!
EDIT:
Журнал, созданный заданием загрузки
{
"0": {
"index_kafka_Test_4b0ecd3ab842a29_bebmfiod": {
"movingAverages": {
"buildSegments": {
"5m": {
"processed": 0,
"unparseable": 1,
"thrownAway": 0,
"processedWithError": 0
},
"15m": {
"processed": 0,
"unparseable": 1,
"thrownAway": 0,
"processedWithError": 0
},
"1m": {
"processed": 0,
"unparseable": 1,
"thrownAway": 0,
"processedWithError": 0
}
}
},
"totals": {
"buildSegments": {
"processed": 0,
"processedWithError": 0,
"thrownAway": 0,
"unparseable": 8
}
}
}
}
}