Параллельный запуск с запросами к хранилищу таблиц Azure - PullRequest
0 голосов
/ 28 июня 2018

У меня есть следующий метод, который выполняет запрос startsWith для RowKey в таблице в хранилище таблиц Azure. Теперь я хочу запускать параллельные запросы, используя startsWith на RowKey.

Можно ли создать параллельный метод, который просто вызывает мой существующий метод, или мне нужно будет создать параллельную версию моего существующего метода?

Вот мой текущий startWith метод:

public async Task<IEnumerable<T>> RowKeyStartsWith<T>
                            (string searchString,
                            string tableName,
                            string partitionKey,
                            string columnName = "RowKey") where T : ITableEntity, new()
{
    // Make sure we have a search string
    if (string.IsNullOrEmpty(searchString)) return null;

    // Get CloudTable
    var table = GetTable(tableName);

    char lastChar = searchString[searchString.Length - 1];
    char nextLastChar = (char)((int)lastChar + 1);
    string nextSearchStr = searchString.Substring(0, searchString.Length - 1) + nextLastChar;

    // Define query segment(s)
    string prefixCondition = TableQuery.CombineFilters(
         TableQuery.GenerateFilterCondition(columnName, QueryComparisons.GreaterThanOrEqual, searchString),
              TableOperators.And,
              TableQuery.GenerateFilterCondition(columnName, QueryComparisons.LessThan, nextSearchStr)
         );

    string filterString = TableQuery.CombineFilters(
            TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey),
               TableOperators.And,
               prefixCondition
        );

    // Create final query
    var query = new TableQuery<T>().Where(filterString);

    // Declare result variable
    var result = new List<T>();

    // Execute query asynchronously
    TableContinuationToken continuationToken = null;
    do
    {
        Task<TableQuerySegment<T>> querySegment = table.ExecuteQuerySegmentedAsync(query, continuationToken);
        TableQuerySegment<T> segment = await querySegment;
        result.AddRange(segment.ToList());
        continuationToken = segment.ContinuationToken;
    } while (continuationToken != null);

    return result;
}

1 Ответ

0 голосов
/ 06 июля 2018

Можно ли создать параллельный метод, который просто вызывает мой существующий метод, или мне придется создавать параллельную версию моего существующего метода?

Насколько я понимаю, вы можете повторно использовать существующий метод и выполнять запросы с несколькими задачами следующим образом:

//for storing the query results
ConcurrentDictionary<string, object> resultDics = new ConcurrentDictionary<string, object>();

//simulate your seaching parameters
List<RowKeyStartsWithParamModel> rowKeySearchs = Enumerable.Range(1, 10)
    .Select(i => new RowKeyStartsWithParamModel()
    {
        SearchString = i.ToString(),
        TableName = "tablename",
        ColumnName = "Rowkey",
        ParationKey = "partionKey"
    }).ToList();

//create multiple tasks to execute your jobs
var tasks = rowKeySearchs.Select(item => Task.Run(async () =>
{   
    //invoke your existing RowKeyStartsWith
    var results=await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
    //add retrieved results
    resultDics.TryAdd(item.SearchString, results);
}));

//synchronously wait all tasks to be executed completely.
Task.WaitAll(tasks.ToArray());

//print all retrieved results
foreach (var item in resultDics)
{
    Console.WriteLine($"{item.Key},{JsonConvert.SerializeObject(item.Value)}");
}

Более того, вы можете использовать Parallel следующим образом:

Parallel.ForEach(rowKeySearchs, async(item) =>
{
    var results = await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
    resultDics.TryAdd(item.SearchString, results);
});

Примечание: Поскольку вы используете await в делегате для каждой итерации, вы не можете получить результаты запроса синхронно после Parallel.ForEach.

Чтобы синхронно получить результаты, используя приведенный выше фрагмент кода, вы можете использовать следующие подходы:

1) Синхронно получать результаты при вызове RowKeyStartsWith при каждой итерации Parallel.ForEach следующим образом:

var results = RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName).Result;

2) Вы можете использовать WaitHandle для синхронного ожидания результатов запроса до завершения всех WaitHandles.

var waitHandles = rowKeySearchs.Select(d => new EventWaitHandle(false, EventResetMode.ManualReset)).ToArray();
Parallel.ForEach(rowKeySearchs, async (item,loopState,index) =>
{
    var results = await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
    resultDics.TryAdd(item.SearchString, results);
    waitHandles[index].Set(); //release
});
WaitHandle.WaitAll(waitHandles); //block the current thread until all EventWaitHandles released
...