@ r3plica,
Это старая проблема, которая была исправлена в версии 1.1.2 или более поздней версии BulkExecutor.
Я попытался воспроизвести вашу проблему, используя Github Repo , чтобы запустить метод BulkImportAsync BulkExecutor, сработал для меня отлично.
Я использовал метод ниже для CreatePartitionCollection-
static internal async Task<DocumentCollection> CreatePartitionedCollectionAsync(DocumentClient client, string databaseName,
string collectionName, int collectionThroughput)
{
PartitionKeyDefinition partitionKey = new PartitionKeyDefinition
{
Paths = new Collection<string> { ConfigurationManager.AppSettings["CollectionPartitionKey"] }
};
DocumentCollection collection = new DocumentCollection { Id = collectionName, PartitionKey = partitionKey };
try
{
collection = await client.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(databaseName),
collection,
new RequestOptions { OfferThroughput = collectionThroughput });
}
catch (Exception e)
{
throw e;
}
return collection;
}
А затем в методе Main используется BulkImport, как показано ниже: -
// Подготовка к массовому импорту.
// Создание документов с простым ключом раздела.
строка partitionKeyProperty = dataCollection.PartitionKey.Paths [0] .Replace ("/", "");
long numberOfDocumentsToGenerate = long.Parse(ConfigurationManager.AppSettings["NumberOfDocumentsToImport"]);
int numberOfBatches = int.Parse(ConfigurationManager.AppSettings["NumberOfBatches"]);
long numberOfDocumentsPerBatch = (long)Math.Floor(((double)numberOfDocumentsToGenerate) / numberOfBatches);
// Set retry options high for initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
BulkImportResponse bulkImportResponse = null;
long totalNumberOfDocumentsInserted = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var tasks = new List ();
tasks.Add(Task.Run(async () =>
{
Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", i));
do
{
try
{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: null,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
}
catch (DocumentClientException de)
{
Trace.TraceError("Document client exception: {0}", de);
break;
}
catch (Exception e)
{
Trace.TraceError("Exception: {0}", e);
break;
}
} while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count);
totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
},
token));
Дайте мне знать, если вам нужно больше деталей.