TPoller.cs 962 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. namespace TNet
  5. {
  6. internal class TPoller : IPoller
  7. {
  8. // 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
  9. private readonly BlockingCollection<Action> blockingCollection = new BlockingCollection<Action>();
  10. public void Add(Action action)
  11. {
  12. this.blockingCollection.Add(action);
  13. }
  14. public void Dispose()
  15. {
  16. }
  17. public void Run(int timeout)
  18. {
  19. // 处理读写线程的回调
  20. Action action;
  21. if (this.blockingCollection.TryTake(out action, timeout))
  22. {
  23. var queue = new Queue<Action>();
  24. queue.Enqueue(action);
  25. while (true)
  26. {
  27. if (!this.blockingCollection.TryTake(out action, 0))
  28. {
  29. break;
  30. }
  31. queue.Enqueue(action);
  32. }
  33. while (queue.Count > 0)
  34. {
  35. Action a = queue.Dequeue();
  36. a();
  37. }
  38. }
  39. }
  40. }
  41. }