TPoller.cs 917 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. namespace TNet
  5. {
  6. public class TPoller: IPoller
  7. {
  8. // 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
  9. private readonly BlockingCollection<Action> blockingCollection = new BlockingCollection<Action>();
  10. public void Add(Action action)
  11. {
  12. this.blockingCollection.Add(action);
  13. }
  14. public void Run(int timeout)
  15. {
  16. // 处理读写线程的回调
  17. Action action;
  18. if (!this.blockingCollection.TryTake(out action, timeout))
  19. {
  20. return;
  21. }
  22. var queue = new Queue<Action>();
  23. queue.Enqueue(action);
  24. while (true)
  25. {
  26. if (!this.blockingCollection.TryTake(out action, 0))
  27. {
  28. break;
  29. }
  30. queue.Enqueue(action);
  31. }
  32. while (queue.Count > 0)
  33. {
  34. Action a = queue.Dequeue();
  35. a();
  36. }
  37. }
  38. }
  39. }