ThreadSynchronizationContext.cs 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. namespace ET
  5. {
  6. public class ThreadSynchronizationContext : SynchronizationContext
  7. {
  8. public static ThreadSynchronizationContext Instance { get; } =
  9. new ThreadSynchronizationContext(Thread.CurrentThread.ManagedThreadId);
  10. private readonly int threadId;
  11. // 线程同步队列,发送接收socket回调都放到该队列,由poll线程统一执行
  12. private readonly ConcurrentQueue<Action> queue = new ConcurrentQueue<Action>();
  13. private Action a;
  14. public ThreadSynchronizationContext(int threadId)
  15. {
  16. this.threadId = threadId;
  17. }
  18. public void Update()
  19. {
  20. while (true)
  21. {
  22. if (!this.queue.TryDequeue(out a))
  23. {
  24. return;
  25. }
  26. try
  27. {
  28. a();
  29. }
  30. catch (Exception e)
  31. {
  32. Log.Error(e);
  33. }
  34. }
  35. }
  36. public override void Post(SendOrPostCallback callback, object state)
  37. {
  38. this.Post(() => callback(state));
  39. }
  40. public void Post(Action action)
  41. {
  42. if (Thread.CurrentThread.ManagedThreadId == this.threadId)
  43. {
  44. try
  45. {
  46. action();
  47. }
  48. catch (Exception e)
  49. {
  50. Log.Error(e);
  51. }
  52. return;
  53. }
  54. this.queue.Enqueue(action);
  55. }
  56. public void PostNext(Action action)
  57. {
  58. this.queue.Enqueue(action);
  59. }
  60. }
  61. }