Я написал следующий код, который вызывается в API, работающем в Cloud Run. Этот API вызывается один раз для каждого файла, который создается в корзине GCS через уведомление pubsub. Сегодня я добавил код для перемещения файла в обработанные или неудачные сегменты в зависимости от того, были ли мои файлы обработаны полностью или нет. Однако, глядя на мои журналы StackDriver, кажется, что во многих случаях файл удаляется из входящей папки и перемещается в обработанную до того, как остальная часть кода сможет его прочитать. Я увижу строку, начинающую читать файл xyz, но затем сразу после него выдается ошибка, потому что файл был перемещен. Мой код изначально не использовал asyn c и ожидал, и даже тогда я видел такое же поведение. Есть идеи, почему это происходит? Я могу прикрепить журналы StackDriver, если это поможет.
private async Task ReadMyFile(MyFile file)
{
FileStream downloadStream= null;
StorageClient storageClient = StorageClient.Create();
try
{
var gcsURI = @"gs://" + file.bucket + @"/" + file.fileName;
Console.WriteLine("Beginning to Read My File: " + gcsURI);
using (downloadStream= System.IO.File.Create(file.fileName))
{
storageClient.DownloadObject(file.bucket, file.fileName, downloadStream);
downloadStream.Position = 0;
}
using (var f = System.IO.File.OpenRead(file.fileName))
using (var zip = new ZipArchive(f, ZipArchiveMode.Read))
using (var stream = zip.Entries[0].Open())
{
Console.WriteLine("Successfully downloaded My File: " + gcsURI);
var fna = GetFileAttributesFromFileName(file.fileName);
int type1Counter = 0;
int type2Counter = 0;
int type3Counter = 0;
StringBuilder type1s = new StringBuilder();
StringBuilder type2s = new StringBuilder();
StringBuilder type3Points = new StringBuilder();
using (StreamReader sr = new StreamReader(stream))
{
while ((line = sr.ReadLine()) != null)
{
var tokens = line.Split(',');
int dataType = Convert.ToInt32(tokens[1]);
int oid = Convert.ToInt32(tokens[0]);
DateTime dateTimeUTC = Convert.ToDateTime(tokens[2]);
switch (dataType)
{
case 0:
var type1Line = taName + "|" + bt + "|" + fna.vId + "|" + fna.dd + "|" + dateTimeUTC.ToString("yyyy-MM-dd") + "|" + dateTimeUTC.ToString("HH") + "|" + line.Replace(",", "|");
type1s.AppendLine(type1Line);
type1Counter++;
break;
case 1:
var type2Line = taName + "|" + bt + "|" + fna.vId + "|" + fna.dd + "|" + dateTimeUTC.ToString("yyyy-MM-dd") + "|" + dateTimeUTC.ToString("HH") + "|" + line.Replace(",", "|");
type2s.AppendLine(type2Line);
type2Counter++;
break;
case 2:
var type3_Line = taName + "|" + bt + "|" + fna.vId + "|" + fna.dd + "|" + dateTimeUTC.ToString("yyyy-MM-dd") + "|" + dateTimeUTC.ToString("HH") + "|" + line.Replace(",", "|");
type3Points.AppendLine(type3_Line);
type3Counter++;
break;
detype1:
break;
}
}
}
if (type1Counter > 0)
{
using (var memoryStream = new MemoryStream())
using (var writer = new StreamWriter(memoryStream))
{
writer.Write(type1s);
writer.Flush();
memoryStream.Position = 0;
await storageClient.UploadObjectAsync
(file.readyforingestionbucket,
"f_" + file.fileName,
"text/plain",
memoryStream);
}
}
if (type2Counter > 0)
{
using (var memoryStream = new MemoryStream())
using (var writer = new StreamWriter(memoryStream))
{
writer.Write(type2s);
writer.Flush();
memoryStream.Position = 0;
await storageClient.UploadObjectAsync
(file.readyforingestionbucket,
"n_" + file.fileName,
"text/plain",
memoryStream);
}
}
if (type3Counter > 0)
{
using (var memoryStream = new MemoryStream())
using (var writer = new StreamWriter(memoryStream))
{
writer.Write(type3Points);
writer.Flush();
memoryStream.Position = 0;
await storageClient.UploadObjectAsync
(file.readyforingestionbucket,
"a_" + file.fileName,
"text/plain",
memoryStream);
}
}
// File completed processing, move to processed folder
Console.WriteLine("Copying {0} to processed folder {1}", file.fileName, file.processedBucket);
await storageClient.CopyObjectAsync(file.bucket, file.fileName, file.processedBucket, file.fileName);
Console.WriteLine("Copied {0} to processed folder {1}", file.fileName, file.processedBucket);
Console.WriteLine("Deleting {0}", file.fileName);
await storageClient.DeleteObjectAsync(file.bucket, file.fileName);
Console.WriteLine("Deleted {0}", file.fileName);
}
}
catch (Exception ex)
{
Console.WriteLine("Error reading file: " + file.fileName + " - " + ex.ToString());
Console.WriteLine(ex.InnerException.ToString());
// File failed processing, move to failed folder
Console.WriteLine("Copying {0} to failed folder {1}", file.fileName, file.failedbucket);
var copied = storageClient.CopyObjectAsync(file.bucket, file.fileName, file.failedbucket, file.fileName);
copied.Wait();
Console.WriteLine("Copied {0} to processed folder {1}", file.fileName, file.failedbucket);
Console.WriteLine("Deleting {0}", file.fileName);
storageClient.DeleteObject(file.bucket, file.fileName);
Console.WriteLine("Deleted {0}", file.fileName);
}
finally
{
try
{
downloadStream.Dispose();
// Delete the file to not run out of disk space on the image.
Console.Write("Deleting local file from image: " + file.fileName);
System.IO.File.Delete(file.fileName);
Console.Write("Deleted local file from image: " + file.fileName);
}
catch (Exception ex)
{
Console.WriteLine("Error deleting file: " + file.fileName);
Console.WriteLine(ex.InnerException.ToString());
}
}
}