GCS файл перемещается перед обработкой с и без асин c вызовов - PullRequest
0 голосов
/ 05 февраля 2020

Я написал следующий код, который вызывается в API, работающем в Cloud Run. Этот API вызывается один раз для каждого файла, который создается в корзине GCS через уведомление pubsub. Сегодня я добавил код для перемещения файла в обработанные или неудачные сегменты в зависимости от того, были ли мои файлы обработаны полностью или нет. Однако, глядя на мои журналы StackDriver, кажется, что во многих случаях файл удаляется из входящей папки и перемещается в обработанную до того, как остальная часть кода сможет его прочитать. Я увижу строку, начинающую читать файл xyz, но затем сразу после него выдается ошибка, потому что файл был перемещен. Мой код изначально не использовал asyn c и ожидал, и даже тогда я видел такое же поведение. Есть идеи, почему это происходит? Я могу прикрепить журналы StackDriver, если это поможет.

        private async Task ReadMyFile(MyFile file)
        {
            FileStream downloadStream= null;
            StorageClient storageClient = StorageClient.Create();
            try
            {
                var gcsURI = @"gs://" + file.bucket + @"/" + file.fileName;
                Console.WriteLine("Beginning to Read My File: " + gcsURI); 
                using (downloadStream= System.IO.File.Create(file.fileName))
                {
                    storageClient.DownloadObject(file.bucket, file.fileName, downloadStream);
                    downloadStream.Position = 0;
                }
                using (var f = System.IO.File.OpenRead(file.fileName))
                using (var zip = new ZipArchive(f, ZipArchiveMode.Read))
                using (var stream = zip.Entries[0].Open())
                {
                    Console.WriteLine("Successfully downloaded My File: " + gcsURI);
                    var fna = GetFileAttributesFromFileName(file.fileName);

                    int type1Counter = 0;
                    int type2Counter = 0;
                    int type3Counter = 0;
                    StringBuilder type1s = new StringBuilder();
                    StringBuilder type2s = new StringBuilder();
                    StringBuilder type3Points = new StringBuilder();

                    using (StreamReader sr = new StreamReader(stream))
                    {
                        while ((line = sr.ReadLine()) != null)
                        {
                            var tokens = line.Split(',');
                            int dataType = Convert.ToInt32(tokens[1]);
                            int oid = Convert.ToInt32(tokens[0]);
                            DateTime dateTimeUTC = Convert.ToDateTime(tokens[2]);
                            switch (dataType)
                            {
                                case 0:
                                    var type1Line = taName + "|" + bt + "|" + fna.vId + "|" + fna.dd + "|" + dateTimeUTC.ToString("yyyy-MM-dd") + "|" + dateTimeUTC.ToString("HH") + "|" + line.Replace(",", "|");
                                    type1s.AppendLine(type1Line);
                                    type1Counter++;
                                    break;
                                case 1:
                                    var type2Line = taName + "|" + bt + "|" + fna.vId + "|" + fna.dd + "|" + dateTimeUTC.ToString("yyyy-MM-dd") + "|" + dateTimeUTC.ToString("HH") + "|" + line.Replace(",", "|");
                                    type2s.AppendLine(type2Line);
                                    type2Counter++;
                                    break;
                                case 2:
                                    var type3_Line = taName + "|" + bt + "|" + fna.vId + "|" + fna.dd + "|" + dateTimeUTC.ToString("yyyy-MM-dd") + "|" + dateTimeUTC.ToString("HH") + "|" + line.Replace(",", "|");
                                    type3Points.AppendLine(type3_Line);
                                    type3Counter++;
                                    break;
                                detype1:
                                    break;
                            }
                        }
                    }

                    if (type1Counter > 0)
                    {
                        using (var memoryStream = new MemoryStream())
                        using (var writer = new StreamWriter(memoryStream))
                        {
                            writer.Write(type1s);
                            writer.Flush();
                            memoryStream.Position = 0;

                            await storageClient.UploadObjectAsync
                                (file.readyforingestionbucket,
                                "f_" + file.fileName,
                                "text/plain",
                                memoryStream);
                        }
                    }

                    if (type2Counter > 0)
                    {
                        using (var memoryStream = new MemoryStream())
                        using (var writer = new StreamWriter(memoryStream))
                        {
                            writer.Write(type2s);
                            writer.Flush();
                            memoryStream.Position = 0;

                            await storageClient.UploadObjectAsync
                                (file.readyforingestionbucket,
                                "n_" + file.fileName,
                                "text/plain",
                                memoryStream);
                        }
                    }

                    if (type3Counter > 0)
                    {
                        using (var memoryStream = new MemoryStream())
                        using (var writer = new StreamWriter(memoryStream))
                        {
                            writer.Write(type3Points);
                            writer.Flush();
                            memoryStream.Position = 0;

                            await storageClient.UploadObjectAsync
                                (file.readyforingestionbucket,
                                "a_" + file.fileName,
                                "text/plain",
                                memoryStream);
                        }
                    }

                    // File completed processing, move to processed folder
                    Console.WriteLine("Copying {0} to processed folder {1}", file.fileName, file.processedBucket);
                    await storageClient.CopyObjectAsync(file.bucket, file.fileName, file.processedBucket, file.fileName);
                    Console.WriteLine("Copied {0} to processed folder {1}", file.fileName, file.processedBucket);
                    Console.WriteLine("Deleting {0}", file.fileName);
                    await storageClient.DeleteObjectAsync(file.bucket, file.fileName);
                    Console.WriteLine("Deleted {0}", file.fileName);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error reading file: " + file.fileName + " - " + ex.ToString());
                Console.WriteLine(ex.InnerException.ToString());

                // File failed processing, move to failed folder
                Console.WriteLine("Copying {0} to failed folder {1}", file.fileName, file.failedbucket);
                var copied = storageClient.CopyObjectAsync(file.bucket, file.fileName, file.failedbucket, file.fileName);
                copied.Wait();
                Console.WriteLine("Copied {0} to processed folder {1}", file.fileName, file.failedbucket);
                Console.WriteLine("Deleting {0}", file.fileName);
                storageClient.DeleteObject(file.bucket, file.fileName);
                Console.WriteLine("Deleted {0}", file.fileName);
            }
            finally
            {
                try
                {
                    downloadStream.Dispose();
                    // Delete the file to not run out of disk space on the image.
                    Console.Write("Deleting local file from image: " + file.fileName);
                    System.IO.File.Delete(file.fileName);
                    Console.Write("Deleted local file from image: " + file.fileName);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Error deleting file: " + file.fileName);
                    Console.WriteLine(ex.InnerException.ToString());
                }
            }
        }

...