Как ограничить количество одновременных операций асинхронного ввода-вывода?

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Здесь проблема, он запускает более 1000 одновременных веб-запросов. Есть ли простой способ ограничить количество одновременных http-запросов? Таким образом, в любой момент времени загружается не более 20 веб-страниц. Как это сделать максимально эффективно?

 svick29 мая 2012 г., 23:40
Как это отличается от твой предыдущий вопрос?
 Chris Disley29 мая 2012 г., 23:46
/ Stackoverflow.com вопросы / 9290498 / ... С параметром ParallelOptions.
 spender29 мая 2012 г., 23:48
@ ChrisDisley, это только распараллелит запуск запросов.
 eglasius30 янв. 2014 г., 09:23
@ svick прав, чем он отличается? кстати, я люблю ответ там Stackoverflow.com / а / 10802883/66372
 Shimmy19 авг. 2015 г., 08:21
КромеHttpClient являетсяIDisposable, и вы должны избавиться от него, особенно когда вы собираетесь использовать более 1000 из них.HttpClient может использоваться в качестве одиночного для нескольких запросов.

Ответы на вопрос(12)

процессором. Здесь речь идет об операциях, связанных с вводом / выводом. Ваша реализация должна быть чисто асинхронный, если вы не перегружаете одноядерный процессор на вашем многоядерном процессоре.

РЕДАКТИРОВАТ Мне нравится предложение usr использовать здесь "асинхронный семафор".

 Sean U29 мая 2012 г., 23:48
араллельные расширения @ также можно использовать как способ мультиплексирования операций ввода-вывода без необходимости вручную реализовывать чисто асинхронное решение. Я согласен, что это может показаться небрежным, но пока вы строго ограничиваете количество одновременных операций, это, вероятно, не будет слишком сильно напрягать пул потоков.
 usr29 мая 2012 г., 23:50
Я не думаю, что этот ответ дает ответ. Одной лишь асинхронности здесь недостаточно: мы действительно хотим регулировать физические операции ввода-вывода неблокирующим образом.
 Grief Coder29 мая 2012 г., 23:39
Хорошая точка зрения! Хотя каждая задача здесь будет содержать асинхронный и синхронизирующий код (страница загружается асинхронно, а затем обрабатывается синхронно). Я пытаюсь распределить синхронизирующую часть кода по центральным процессорам и в то же время ограничить количество одновременных операций асинхронного ввода-вывода.
 spender29 мая 2012 г., 23:44
Зачем? Поскольку одновременный запуск более 1000 http-запросов может не подходить для работы сети пользователя.
 spender30 мая 2012 г., 17:15
Хмм ... не уверен, что я согласен ... при работе над большим проектом, если один слишком много разработчиков примет это мнение, вы получите голод, даже если вклад каждого разработчика в отдельности не достаточен, чтобы опрокинуть вещи. Учитывая, что есть толькооди ThreadPool, даже если вы относитесь к этому с уважением ... если все остальные делают то же самое, могут возникнуть проблемы. Как таковой явсегд советую не запускать длинные вещи в ThreadPool.

в .NET Framework отсутствуют наиболее важные комбинаторы для организации параллельных асинхронных задач. Нет такого встроенного.

Посмотри на AsyncSemaphore класс, построенный самым респектабельным Стивеном Таубом. То, что вы хотите, называется семафором, и вам нужна его асинхронная версия.

 jdasilva15 июн. 2013 г., 22:52
Stephen добавил комментарий в ответ на вопрос в своем посте в блоге, подтверждающий, что использование SemaphoreSlim для .NET 4.5, как правило, будет правильным решением.
 Todd Menier18 апр. 2013 г., 19:53
Должен ли SemaphoreSlim (с его новыми асинхронными методами) быть предпочтительнее, чем AsyncSemphore, или реализация Toub все еще имеет некоторое преимущество?
 Theo Yaung30 мая 2012 г., 08:03
Обратите внимание: «К сожалению, в .NET Framework отсутствуют наиболее важные комбинаторы для организации параллельных асинхронных задач. Такой встроенной функции не существует». больше не является верным с .NET 4.5 Beta. SemaphoreSlim теперь предлагает функцию WaitAsync (...):)
 usr18 апр. 2013 г., 20:02
По моему мнению, встроенный тип должен быть предпочтительным, потому что он, вероятно, будет хорошо протестирован и хорошо спроектирован.

и вы хотите выполнить операцию, связанную с вводом-выводом, с каждым из них (т. Е. Сделать асинхронный http-запрос) одновременно И, при желании, вы также хотите установить максимальное количество параллельные запросы ввода / вывода в режиме реального времени, вот как вы можете это сделать. Таким образом, вы не используете пул потоков и т. Д., Метод использует семафорлис для управления максимальными параллельными запросами ввода-вывода, подобными шаблону скользящего окна, который выполняется одним запросом, оставляет семафор, а следующий получает.

usage: ожидайте ForEachAsync (urlStrings, YourAsyncFunc, необязательный MaxDegreeOfConcurrency);

public static Task ForEachAsync<TIn>(
        IEnumerable<TIn> inputEnumerable,
        Func<TIn, Task> asyncProcessor,
        int? maxDegreeOfParallelism = null)
    {
        int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
        SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

        IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
        {
            await throttler.WaitAsync().ConfigureAwait(false);
            try
            {
                await asyncProcessor(input).ConfigureAwait(false);
            }
            finally
            {
                throttler.Release();
            }
        });

        return Task.WhenAll(tasks);
    }
 AgentFire21 окт. 2017 г., 14:13
 Dogu Arslan21 окт. 2017 г., 21:53
no вам не нужно явно располагать SemaphoreSlim в этой реализации и использовании, так как он используется внутри метода, а метод не имеет доступа к свойству AvailableWaitHandle, и в этом случае нам бы пришлось либо располагать, либо заключать его в блок использования.
 AgentFire21 окт. 2017 г., 22:43
Просто думая о лучших практиках и уроках, которые мы преподаем другим людям. Ausing было бы здорово

но есть вариант без списка ожидающих задач.

 class SomeChecker
 {
    private const int ThreadCount=20;
    private CountdownEvent _countdownEvent;
    private SemaphoreSlim _throttler;

    public Task Check(IList<string> urls)
    {
        _countdownEvent = new CountdownEvent(urls.Count);
        _throttler = new SemaphoreSlim(ThreadCount); 

        return Task.Run( // prevent UI thread lock
            async  () =>{
                foreach (var url in urls)
                {
                    // do an async wait until we can schedule again
                    await _throttler.WaitAsync();
                    ProccessUrl(url); // NOT await
                }
                //instead of await Task.WhenAll(allTasks);
                _countdownEvent.Wait();
            });
    }

    private async Task ProccessUrl(string url)
    {
        try
        {
            var page = await new WebClient()
                       .DownloadStringTaskAsync(new Uri(url)); 
            ProccessResult(page);
        }
        finally
        {
            _throttler.Release();
            _countdownEvent.Signal();
        }
    }

    private void ProccessResult(string page){/*....*/}
}
 Theo Yaung14 мая 2016 г., 03:13
Обратите внимание, есть одна опасность использования этого подхода - любые исключения, которые происходят вProccessUrl или его подфункции будут фактически игнорироваться. Они будут включены в Задачи, но не перколированы обратно к исходному вызывающемуCheck(...). Лично вот почему я все еще использую Задачи и их функции комбинатора, такие какWhenAll а такжеWhenAny - для лучшего распространения ошибок. :)

который я создал.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Образец использования:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

новый ответ. У @vitidev был блок кода, который был повторно использован почти без изменений в проекте, который я просмотрел. После обсуждения с несколькими коллегами один из них спросил: «Почему бы вам просто не использовать встроенные методы TPL?» ActionBlock выглядит как победитель там.https: //msdn.microsoft.com/en-us/library/hh194773 (v = vs.110) .aspx. Вероятно, в конечном итоге не изменится какой-либо существующий код, но определенно постарается принять этот нюгет и повторно использовать лучшие практики мистера Софти для регулирования параллелизм

Просто более лаконичная версияhttps: //stackoverflow.com/a/10810730/118616:

static async Task WhenAll(IEnumerable<Task> tasks, int maxThreadCount) {
    using (var guard = new SemaphoreSlim(initialCount: maxThreadCount)) {
        await Task.WhenAll(tasks.Select(async task => {
            await guard.WaitAsync();

            return task.ContinueWith(t => guard.Release());
        }));
    }
}

и прямое использование семафора может быть сложным в случаях ошибок, поэтому я бы предложил использоватьAsyncEnumerator Пакет NuGet вместо того, чтобы заново изобретать колесо:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParallelism: 20);

библиотека Parallel Tasks может обрабатывать только параллельные задачи, равные количеству ядер процессора в машине. Это означает, что если у вас есть четырехъядерный компьютер, в данный момент времени будут выполняться только 4 задачи (если вы не понизите MaxDegreeOfParallelism).

 Grief Coder29 мая 2012 г., 23:48
@ svick: да. Знаете ли вы, как эффективно контролировать максимальное количество одновременных задач TPL (не потоков)?
 GregC29 мая 2012 г., 23:38
И еще один, правильно?
 svick29 мая 2012 г., 23:42
Библиотека, безусловно, может обрабатывать больше задач (с помощьюRunning статус) одновременно чем количество ядер. Это особенно актуально для задач, связанных с вводом / выводом.
 scottm29 мая 2012 г., 23:37
Не виделawait ключевое слово там. Удаление, которое должно решить проблему, исправить?
 Grief Coder29 мая 2012 г., 23:36
Да, но это не относится к асинхронным операциям ввода-вывода. Приведенный выше код запустит более 1000 одновременных загрузок, даже если он работает в одном потоке.
Решение Вопроса

безусловно, можете сделать это в последних версиях async для .NET, используя .NET 4.5 Beta. Предыдущий пост от usr указывает на хорошую статью, написанную Стивеном Таубом, но менее анонсированная новость состоит в том, что асинхронный семафор фактически попал в бета-версию .NET 4.5

Если ты смотришь на нашу любимуюSemaphoreSlim class (который вы должны использовать, так как он более производительный, чем оригинальныйSemaphore), теперь он может похвастатьWaitAsync(...) ряд перегрузок со всеми ожидаемыми аргументами - интервалы времени ожидания, токены отмены, все ваши обычные друзья планирования:)

Стивен также написал более свежую запись в блоге о новых вкусностях .NET 4.5, которые вышли с бета-версие Что нового в параллелизме в .NET 4.5 Beta.

Последний пример кода о том, как использовать SemaphoreSlim для регулирования асинхронного метода:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Последнее, но, вероятно, стоит упомянуть решение, использующее планирование на основе TPL. можете создавать связанные с делегатом задачи в TPL, которые еще не были запущены, и разрешить настраиваемому планировщику задач ограничивать параллелизм. На самом деле, здесь есть образец MSDN:

Смотрите такжеПланировщик задач.

 Shimmy19 авг. 2015 г., 08:52
Почему бы тебе не избавитьсяHttpClient
 GreyCloud19 мар. 2013 г., 12:59
разве параллель не является более подходящим подходом с ограниченной степенью параллелизма? Msdn.microsoft.com / EN-US / библиотека / ...
 GameScripting06 нояб. 2013 г., 16:28
Обратите внимание, чтоWaitAsync неявно увеличит внутренний счетчик. Я столкнулся с проблемой, когда не начинаю задание для каждого, но некоторые элементы в исходной коллекции. Убедитесь, что вы звоните толькоWaitAsync когда вы планируете задачу.
 Josh Noe06 июн. 2016 г., 04:46
@ GreyCloud:Parallel.ForEach работает с синхронным кодом. Это позволяет вам вызывать асинхронный код.
 Rupert Rawnsley04 окт. 2017 г., 16:20
Учитывая популярность этого ответа, стоит отметить, что HttpClient может и должен быть одним общим экземпляром, а не экземпляром для запроса.

ИспользуйтеMaxDegreeOfParallelism, который можно указать вParallel.ForEach():

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });
 Theo Yaung14 мая 2016 г., 03:20
@ NealEhardt правильно.Parallel.ForEach(...) предназначен для запуска блоков Синхронное код параллельно (например, в разных потоках).
 Neal Ehardt06 апр. 2015 г., 22:08
Я не думаю, что это работает.GetStringAsync(url) предназначен для вызова сawait. Если вы проверяете типvar html, этоTask<string>, а не результатstring.

вы захотите создать действие или задачу для каждого URL-адреса, по которому хотите попасть, поместить их в список, а затем обработать этот список, ограничив число, которое можно обрабатывать параллельно.

Мой пост в блоге показывает, как сделать это как с Задачами, так и с Действиями, и предоставляет пример проекта, который можно загрузить и запустить, чтобы увидеть оба в действии.

С действиями

Если вы используете Actions, вы можете использовать встроенную функцию .Net Parallel.Invoke. Здесь мы ограничиваем его одновременной работой не более 20 потоков.

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());
С заданиями

С Задачами нет встроенной функции. Тем не менее, вы можете использовать тот, который я предоставляю в своем блоге.

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
    {
        await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
    }

    /// <summary>
    /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
    /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
    /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
    /// </summary>
    /// <param name="tasksToRun">The tasks to run.</param>
    /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
    /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
    /// <param name="cancellationToken">The cancellation token.</param>
    public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
    {
        // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
        var tasks = tasksToRun.ToList();

        using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
        {
            var postTaskTasks = new List<Task>();

            // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
            tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

            // Start running each task.
            foreach (var task in tasks)
            {
                // Increment the number of tasks currently running and wait if too many are running.
                await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

                cancellationToken.ThrowIfCancellationRequested();
                task.Start();
            }

            // Wait for all of the provided tasks to complete.
            // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
            await Task.WhenAll(postTaskTasks.ToArray());
        }
    }

А затем, создав список задач и вызвав функцию для их запуска, скажем, не более 20 одновременно, вы можете сделать это:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
 Jay Shah09 мая 2018 г., 15:17
Я думаю, вы просто указываете initialCount для SemaphoreSlim, и вам нужно указать второй параметр, то есть maxCount, в конструкторе SemaphoreSlim.

Ваш ответ на вопрос