В C# продюсер методом blockingcollection потребителей, не блокируя поток-потребитель



У меня есть ситуация, когда мне нужно иметь большое количество (сотни) очередей, где элементы должны обрабатываться по порядку (нужен однопоточный потребитель). Моя первая реализация, основанная на примерах, я использовал одну длительную задачу на BlockingCollection для использования элементов очереди. Тем не менее, я закончил тем, что у меня были приложения с сотнями потоков, в основном сидящих без дела, ничего не делая, но потребляя память, так как очереди пусты большую часть времени.



Я так и думал. было бы лучше, чтобы потребительская задача выполнялась только в том случае, если есть что-то в очереди для обработки, однако я не смог найти образцы, которые обеспечивают то, что должно быть лучшими практиками.



Я пришел к решению, похожему на приведенное ниже. Но проблема в том, что каждый элемент приводит к новой задаче (может быть, это неэффективно? Пустая трата ресурсов?). Но если я не создам новую задачу для каждого элемента, я не могу гарантировать, что элемент не будет стоять в очереди необработанный.



    private object _processSyncObj = new object();
private volatile bool _isProcessing;
private BlockingCollection<string> _queue = new BlockingCollection<string>();

private void EnqueueItem(string item)
{
_queue.Add(item);
Task.Factory.StartNew(ProcessQueue);
}

private void ProcessQueue()
{
if (_isProcessing)
return;

lock (_processSyncObj)
{
string item;
while (_isProcessing = _queue.TryTake(out item))
{
// process item
}
}
}


Каковы наилучшие методы/лучшее решение для этой ситуации с гарантией того, что не существует ситуации, когда элемент находится в очереди, но ни один потребитель не работает?

631   3  

3 ответов:

Я думаю, что то, что вы сделали, разумно, потому что задача была сделана, чтобы хорошо масштабироваться также с миллионами задач, производя внутренние под-очереди против пула потоков, избегая слишком большого переключения контекста.

За кулисами задачи выстраиваются в очередь в ThreadPool, который был улучшен с помощью алгоритмов, определяющих и корректирующих количество потоков и обеспечивающих балансировку нагрузки для максимизации пропускной способности. Это делает задачи относительно легкими, и вы можете создавать многие из них. они позволяют создать мелкозернистый параллелизм.

Параллелизм Задач (Библиотека Параллельных Задач)

...но то, что вы сделали, закончится просто нормальным программированием задач, потому что для каждого запроса вы запускаете задачу, поэтому блокирующая коллекция совершенно не используется. Насколько я понял, ваша забота заключается в том, чтобы запустить задание и позволить TaskScheduler, запустить задания в порядке, как они прибыли.

Знаете ли вы, что вы также можете настроить TaskScheduler?

Как насчет просто использовать задачу шаблон программирования, плюс пользовательский TaskScheduler для управления потоком запланированной задачи?

Например, вы можете создать OrderedTaskScheduler, которые являются производными от LimitedConcurrencyLevelTaskScheduler, что бы так себя вести...

КлассLimitedConcurrencyLevelTaskScheduler предлагает планировщик задач, обеспечивающий максимальный уровень параллелизма при запуске поверх пула потоков. Необходимо установить максимальную степень параллельности, желаемую для этот планировщик.

КлассOrderedTaskScheduler предоставляет планировщик задач, который обеспечивает выполнение только одной задачи одновременно. Задачи выполняются в том порядке, в котором они были поставлены в очередь (FIFO). Это подкласс LimitedConcurrencyLevelTaskScheduler, который отправляет 1 в качестве параметра для своего конструктора базового класса.

Вы можете найти эти планировщики уже разработаны, они называются ParallelExtensionsExtras , и вы можете скачать его из здесь , и прочитать некоторые пишут об этом из этого блога и других.

Вы можете найти его также непосредственно на nuget и кодовое зеркало на github.

Наслаждайтесь! :)

Рассматривали ли вы параллельные дополнительные расширения ? Я считаю, что ваш сценарий может быть легко удовлетворенQueuedTaskScheduler или ThreadPerTaskScheduler .

Конечно, довольно переписать, но вы рассматривали возможность сделать это таким образом вместо этого?

public class WorkerQueue<T>
{
    public WorkerQueue(Action<T> workerMethod)
    {
        _workerMethod = workerMethod;
        Task.Factory.StartNew(WorkerAction);
    }

    private Action<T> _workerMethod;

    private void WorkerAction()
    {
        lock (_processSyncObj)
        {
            if (_workerMethod == null)
                return;

            while (true)
            {
                T item;
                if (_queue.TryTake(out item))
                {
                    var method = _workerMethod;
                    if (method != null)
                        method(item);

                }
            }
        }
    }

    private BlockingCollection<T> _queue = new BlockingCollection<T>();
    private object _processSyncObj = new object();
    private volatile bool _isProcessing;

    public void EnqueueItem(T item)
    {
        // thought you might want to swap BlockingCollection with a normal collection since you apparently only want your read threadlocked? You're already making that sure in "WorkerAction"
        _queue.Add(item);
    }
}


/// <summary>
/// Usage example
/// </summary>
public class Program
{
    public void Start()
    {
        var test = new WorkerQueue<string>(WorkerMethod);
    }

    private void WorkerMethod(string s)
    {
        Console.WriteLine(s);
    }
}

Comments

    Ничего не найдено.