Здесь - это мой logstash.conf
файл. (Приносим извинения за то, что не вставили здесь код напрямую; StackOverflow не разрешает публикации, превышающие определенное соотношение код / текст.)
Моя удаленная виртуальная машина, на которой также размещены мои серверы ElasticSearch и LogStash, прослушивает порт 8080.
На моем локальном компьютере я периодически отправляю заархивированные папки (содержащие документы JSON) по TCP на мой удаленный сервер, который получает данные в поток памяти, распаковывает папки и отправляет содержимое в LogStash. LogStash, в свою очередь, передает данные в ElasticSearch.
В настоящее время я тестирую рабочий процесс с некоторыми фиктивными данными.
На моем удаленном сервере, вот способ получения данных по TCP:
private static void ReceiveAndUnzipElasticSearchDocumentFolder(int numBytesExpectedToReceive)
{
int numBytesLeftToReceive = numBytesExpectedToReceive;
using (MemoryStream zippedFolderStream = new MemoryStream(new byte[numBytesExpectedToReceive]))
{
while (numBytesLeftToReceive > 0)
{
// Receive data in small packets
}
zippedFolderStream.Unzip(afterReadingEachDocument: LogStashDataSender.Send);
}
}
Вот код для распаковки полученной папки:
public static class StreamExtensions
{
public static void Unzip(this Stream zippedElasticSearchDocumentFolderStream, Action<ElasticSearchJsonDocument> afterReadingEachDocument)
{
JsonSerializer jsonSerializer = new JsonSerializer();
foreach (ZipArchiveEntry entry in new ZipArchive(zippedElasticSearchDocumentFolderStream).Entries)
{
using (JsonTextReader jsonReader = new JsonTextReader(new StreamReader(entry.Open())))
{
dynamic jsonObject = jsonSerializer.Deserialize<ExpandoObject>(jsonReader);
string jsonIndexId = jsonObject.IndexId;
string jsonDocumentId = jsonObject.DocumentId;
afterReadingEachDocument(new ElasticSearchJsonDocument(jsonObject, jsonIndexId, jsonDocumentId));
}
}
}
}
А вот способ отправки данных в LogStash:
public static async void Send(ElasticSearchJsonDocument document)
{
HttpResponseMessage response =
await httpClient.PutAsJsonAsync(
IsNullOrWhiteSpace(document.DocumentId)
? $"{document.IndexId}"
: $"{document.IndexId}/{document.DocumentId}",
document.JsonObject);
try
{
response.EnsureSuccessStatusCode();
}
catch (Exception exception)
{
Console.WriteLine(exception.Message);
}
Console.WriteLine($"{response.Content}");
}
httpClient
, указанный в методе public static async void Send(ElasticSearchJsonDocument document)
, был создан с использованием следующего кода:
private const string LogStashHostAddress = "http://127.0.0.1";
private const int LogStashPort = 31311;
httpClient = new HttpClient { BaseAddress = new Uri($"{LogStashHostAddress}:{LogStashPort}/") };
httpClient.DefaultRequestHeaders.Accept.Clear();
httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
Когда я захожу в новый экземпляр отладки, программа работает без сбоев, но умирает сразу после выполнения await httpClient.PutAsJsonAsync
для каждого из документов, содержащихся в заархивированной папке - response.EnsureSuccessStatusCode();
никогда не срабатывает; ни Console.WriteLine(exception.Message);
, ни Console.WriteLine($"{response.Content}");
.
Вот пример ElasticSearchJsonDocument
, который передается методу public static async void Send(ElasticSearchJsonDocument document)
:
Когда я выполнял тот же запрос PUT
с использованием cURL, индекс Book
был успешно создан, и я мог тогда запросить GET
для получения данных из ElasticSearch.
Мои вопросы:
- Почему программа сразу же умерла (без видимых сообщений об исключениях) после выполнения
await httpClient.PutAsJsonAsync(...)
для каждого из документов JSON внутри полученной заархивированной папки?
- Какие изменения я должен внести, чтобы обеспечить возможность успешного выполнения
PUT
запросов в LogStash с использованием экземпляра HttpClient
?