LimitedConcurrencyLevelTaskScheduler.cs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. //--------------------------------------------------------------------------
  2. //
  3. // Copyright (c) Microsoft Corporation. All rights reserved.
  4. //
  5. // File: LimitedConcurrencyTaskScheduler.cs
  6. //
  7. //--------------------------------------------------------------------------
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. namespace System.Threading.Tasks.Schedulers
  11. {
  12. /// <summary>
  13. /// Provides a task scheduler that ensures a maximum concurrency level while
  14. /// running on top of the ThreadPool.
  15. /// </summary>
  16. public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
  17. {
  18. /// <summary>Whether the current thread is processing work items.</summary>
  19. [ThreadStatic]
  20. private static bool currentThreadIsProcessingItems;
  21. /// <summary>The list of tasks to be executed.</summary>
  22. private readonly LinkedList<Task> tasks = new LinkedList<Task>(); // protected by lock(tasks)
  23. /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
  24. private readonly int maxDegreeOfParallelism;
  25. /// <summary>Whether the scheduler is currently processing work items.</summary>
  26. private int delegatesQueuedOrRunning = 0; // protected by lock(tasks)
  27. /// <summary>
  28. /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
  29. /// specified degree of parallelism.
  30. /// </summary>
  31. /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
  32. public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
  33. {
  34. if (maxDegreeOfParallelism < 1)
  35. {
  36. throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
  37. }
  38. this.maxDegreeOfParallelism = maxDegreeOfParallelism;
  39. }
  40. /// <summary>Queues a task to the scheduler.</summary>
  41. /// <param name="task">The task to be queued.</param>
  42. protected sealed override void QueueTask(Task task)
  43. {
  44. // Add the task to the list of tasks to be processed. If there aren't enough
  45. // delegates currently queued or running to process tasks, schedule another.
  46. lock (tasks)
  47. {
  48. tasks.AddLast(task);
  49. if (delegatesQueuedOrRunning < maxDegreeOfParallelism)
  50. {
  51. ++delegatesQueuedOrRunning;
  52. NotifyThreadPoolOfPendingWork();
  53. }
  54. }
  55. }
  56. /// <summary>
  57. /// Informs the ThreadPool that there's work to be executed for this scheduler.
  58. /// </summary>
  59. private void NotifyThreadPoolOfPendingWork()
  60. {
  61. ThreadPool.UnsafeQueueUserWorkItem(_ =>
  62. {
  63. // Note that the current thread is now processing work items.
  64. // This is necessary to enable inlining of tasks into this thread.
  65. currentThreadIsProcessingItems = true;
  66. try
  67. {
  68. // Process all available items in the queue.
  69. while (true)
  70. {
  71. Task item;
  72. lock (tasks)
  73. {
  74. // When there are no more items to be processed,
  75. // note that we're done processing, and get out.
  76. if (tasks.Count == 0)
  77. {
  78. --delegatesQueuedOrRunning;
  79. break;
  80. }
  81. // Get the next item from the queue
  82. item = tasks.First.Value;
  83. tasks.RemoveFirst();
  84. }
  85. // Execute the task we pulled out of the queue
  86. base.TryExecuteTask(item);
  87. }
  88. }
  89. // We're done processing items on the current thread
  90. finally
  91. {
  92. currentThreadIsProcessingItems = false;
  93. }
  94. }, null);
  95. }
  96. /// <summary>Attempts to execute the specified task on the current thread.</summary>
  97. /// <param name="task">The task to be executed.</param>
  98. /// <param name="taskWasPreviouslyQueued"></param>
  99. /// <returns>Whether the task could be executed on the current thread.</returns>
  100. protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  101. {
  102. // If this thread isn't already processing a task, we don't support inlining
  103. if (!currentThreadIsProcessingItems)
  104. {
  105. return false;
  106. }
  107. // If the task was previously queued, remove it from the queue
  108. if (taskWasPreviouslyQueued)
  109. {
  110. TryDequeue(task);
  111. }
  112. // Try to run the task.
  113. return base.TryExecuteTask(task);
  114. }
  115. /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
  116. /// <param name="task">The task to be removed.</param>
  117. /// <returns>Whether the task could be found and removed.</returns>
  118. protected sealed override bool TryDequeue(Task task)
  119. {
  120. lock (tasks)
  121. {
  122. return tasks.Remove(task);
  123. }
  124. }
  125. /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
  126. public sealed override int MaximumConcurrencyLevel
  127. {
  128. get
  129. {
  130. return maxDegreeOfParallelism;
  131. }
  132. }
  133. /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
  134. /// <returns>An enumerable of the tasks currently scheduled.</returns>
  135. protected sealed override IEnumerable<Task> GetScheduledTasks()
  136. {
  137. bool lockTaken = false;
  138. try
  139. {
  140. Monitor.TryEnter(tasks, ref lockTaken);
  141. if (lockTaken)
  142. {
  143. return tasks.ToArray();
  144. }
  145. else
  146. {
  147. throw new NotSupportedException();
  148. }
  149. }
  150. finally
  151. {
  152. if (lockTaken)
  153. {
  154. Monitor.Exit(tasks);
  155. }
  156. }
  157. }
  158. }
  159. }