HubConnection.cs 67 KB


  1. #if !BESTHTTP_DISABLE_SIGNALR_CORE
  2. using System.Threading;
  3. #if CSHARP_7_OR_LATER
  4. using System.Threading.Tasks;
  5. #endif
  6. using BestHTTP.Futures;
  7. using BestHTTP.SignalRCore.Authentication;
  8. using BestHTTP.SignalRCore.Messages;
  9. using System;
  10. using System.Collections.Generic;
  11. using BestHTTP.Logger;
  12. using System.Collections.Concurrent;
  13. using BestHTTP.PlatformSupport.Threading;
  14. namespace BestHTTP.SignalRCore
  15. {
  16. public sealed class HubConnection : BestHTTP.Extensions.IHeartbeat
  17. {
  18. public static readonly object[] EmptyArgs = new object[0];
  19. /// <summary>
  20. /// Uri of the Hub endpoint
  21. /// </summary>
  22. public Uri Uri { get; private set; }
  23. /// <summary>
  24. /// Current state of this connection.
  25. /// </summary>
  26. public ConnectionStates State {
  27. get { return (ConnectionStates)this._state; }
  28. private set {
  29. Interlocked.Exchange(ref this._state, (int)value);
  30. }
  31. }
  32. private volatile int _state;
  33. /// <summary>
  34. /// Current, active ITransport instance.
  35. /// </summary>
  36. public ITransport Transport { get; private set; }
  37. /// <summary>
  38. /// The IProtocol implementation that will parse, encode and decode messages.
  39. /// </summary>
  40. public IProtocol Protocol { get; private set; }
  41. /// <summary>
  42. /// This event is called when the connection is redirected to a new uri.
  43. /// </summary>
  44. public event Action<HubConnection, Uri, Uri> OnRedirected;
  45. /// <summary>
  46. /// This event is called when successfully connected to the hub.
  47. /// </summary>
  48. public event Action<HubConnection> OnConnected;
  49. /// <summary>
  50. /// This event is called when an unexpected error happen and the connection is closed.
  51. /// </summary>
  52. public event Action<HubConnection, string> OnError;
  53. /// <summary>
  54. /// This event is called when the connection is gracefully terminated.
  55. /// </summary>
  56. public event Action<HubConnection> OnClosed;
  57. /// <summary>
  58. /// This event is called for every server-sent message. When returns false, no further processing of the message is done by the plugin.
  59. /// </summary>
  60. public event Func<HubConnection, Message, bool> OnMessage;
  61. /// <summary>
  62. /// Called when the HubConnection start its reconnection process after loosing its underlying connection.
  63. /// </summary>
  64. public event Action<HubConnection, string> OnReconnecting;
  65. /// <summary>
  66. /// Called after a successful reconnection.
  67. /// </summary>
  68. public event Action<HubConnection> OnReconnected;
  69. /// <summary>
  70. /// Called for transport related events.
  71. /// </summary>
  72. public event Action<HubConnection, ITransport, TransportEvents> OnTransportEvent;
  73. /// <summary>
  74. /// An IAuthenticationProvider implementation that will be used to authenticate the connection.
  75. /// </summary>
  76. public IAuthenticationProvider AuthenticationProvider { get; set; }
  77. /// <summary>
  78. /// Negotiation response sent by the server.
  79. /// </summary>
  80. public NegotiationResult NegotiationResult { get; private set; }
  81. /// <summary>
  82. /// Options that has been used to create the HubConnection.
  83. /// </summary>
  84. public HubOptions Options { get; private set; }
  85. /// <summary>
  86. /// How many times this connection is redirected.
  87. /// </summary>
  88. public int RedirectCount { get; private set; }
  89. /// <summary>
  90. /// The reconnect policy that will be used when the underlying connection is lost. Its default value is null.
  91. /// </summary>
  92. public IRetryPolicy ReconnectPolicy { get; set; }
  93. /// <summary>
  94. /// Logging context of this HubConnection instance.
  95. /// </summary>
  96. public LoggingContext Context { get; private set; }
  97. /// <summary>
  98. /// This will be increment to add a unique id to every message the plugin will send.
  99. /// </summary>
  100. private long lastInvocationId = 1;
  101. /// <summary>
  102. /// Id of the last streaming parameter.
  103. /// </summary>
  104. private int lastStreamId = 1;
  105. /// <summary>
  106. /// Store the callback for all sent message that expect a return value from the server. All sent message has
  107. /// a unique invocationId that will be sent back from the server.
  108. /// </summary>
  109. private ConcurrentDictionary<long, InvocationDefinition> invocations = new ConcurrentDictionary<long, InvocationDefinition>();
  110. /// <summary>
  111. /// This is where we store the methodname => callback mapping.
  112. /// </summary>
  113. private ConcurrentDictionary<string, Subscription> subscriptions = new ConcurrentDictionary<string, Subscription>(StringComparer.OrdinalIgnoreCase);
  114. /// <summary>
  115. /// When we sent out the last message to the server.
  116. /// </summary>
  117. private DateTime lastMessageSentAt;
  118. private DateTime lastMessageReceivedAt;
  119. private DateTime connectionStartedAt;
  120. private RetryContext currentContext;
  121. private DateTime reconnectStartTime = DateTime.MinValue;
  122. private DateTime reconnectAt;
  123. private List<TransportTypes> triedoutTransports = new List<TransportTypes>();
  124. private ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
  125. private bool pausedInLastFrame;
  126. public HubConnection(Uri hubUri, IProtocol protocol)
  127. : this(hubUri, protocol, new HubOptions())
  128. {
  129. }
  130. public HubConnection(Uri hubUri, IProtocol protocol, HubOptions options)
  131. {
  132. this.Context = new LoggingContext(this);
  133. this.Uri = hubUri;
  134. this.State = ConnectionStates.Initial;
  135. this.Options = options;
  136. this.Protocol = protocol;
  137. this.Protocol.Connection = this;
  138. this.AuthenticationProvider = new DefaultAccessTokenAuthenticator(this);
  139. }
  140. public void StartConnect()
  141. {
  142. if (this.State != ConnectionStates.Initial &&
  143. this.State != ConnectionStates.Redirected &&
  144. this.State != ConnectionStates.Reconnecting)
  145. {
  146. HTTPManager.Logger.Warning("HubConnection", "StartConnect - Expected Initial or Redirected state, got " + this.State.ToString(), this.Context);
  147. return;
  148. }
  149. if (this.State == ConnectionStates.Initial)
  150. {
  151. this.connectionStartedAt = DateTime.Now;
  152. HTTPManager.Heartbeats.Subscribe(this);
  153. }
  154. HTTPManager.Logger.Verbose("HubConnection", $"StartConnect State: {this.State}, connectionStartedAt: {this.connectionStartedAt.ToString(System.Globalization.CultureInfo.InvariantCulture)}", this.Context);
  155. if (this.AuthenticationProvider != null && this.AuthenticationProvider.IsPreAuthRequired)
  156. {
  157. HTTPManager.Logger.Information("HubConnection", "StartConnect - Authenticating", this.Context);
  158. SetState(ConnectionStates.Authenticating, null, this.defaultReconnect);
  159. this.AuthenticationProvider.OnAuthenticationSucceded += OnAuthenticationSucceded;
  160. this.AuthenticationProvider.OnAuthenticationFailed += OnAuthenticationFailed;
  161. // Start the authentication process
  162. this.AuthenticationProvider.StartAuthentication();
  163. }
  164. else
  165. StartNegotiation();
  166. }
  167. #if CSHARP_7_OR_LATER
  168. TaskCompletionSource<HubConnection> connectAsyncTaskCompletionSource;
  169. public Task<HubConnection> ConnectAsync()
  170. {
  171. if (this.State != ConnectionStates.Initial && this.State != ConnectionStates.Redirected && this.State != ConnectionStates.Reconnecting)
  172. throw new Exception("HubConnection - ConnectAsync - Expected Initial or Redirected state, got " + this.State.ToString());
  173. if (this.connectAsyncTaskCompletionSource != null)
  174. throw new Exception("Connect process already started!");
  175. this.connectAsyncTaskCompletionSource = new TaskCompletionSource<HubConnection>();
  176. this.OnConnected += OnAsyncConnectedCallback;
  177. this.OnError += OnAsyncConnectFailedCallback;
  178. this.StartConnect();
  179. return connectAsyncTaskCompletionSource.Task;
  180. }
  181. private void OnAsyncConnectedCallback(HubConnection hub)
  182. {
  183. this.OnConnected -= OnAsyncConnectedCallback;
  184. this.OnError -= OnAsyncConnectFailedCallback;
  185. this.connectAsyncTaskCompletionSource.TrySetResult(this);
  186. this.connectAsyncTaskCompletionSource = null;
  187. }
  188. private void OnAsyncConnectFailedCallback(HubConnection hub, string error)
  189. {
  190. this.OnConnected -= OnAsyncConnectedCallback;
  191. this.OnError -= OnAsyncConnectFailedCallback;
  192. this.connectAsyncTaskCompletionSource.TrySetException(new Exception(error));
  193. this.connectAsyncTaskCompletionSource = null;
  194. }
  195. #endif
  196. private void OnAuthenticationSucceded(IAuthenticationProvider provider)
  197. {
  198. HTTPManager.Logger.Verbose("HubConnection", "OnAuthenticationSucceded", this.Context);
  199. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  200. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  201. StartNegotiation();
  202. }
  203. private void OnAuthenticationFailed(IAuthenticationProvider provider, string reason)
  204. {
  205. HTTPManager.Logger.Error("HubConnection", "OnAuthenticationFailed: " + reason, this.Context);
  206. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  207. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  208. SetState(ConnectionStates.Closed, reason, this.defaultReconnect);
  209. }
  210. private void StartNegotiation()
  211. {
  212. HTTPManager.Logger.Verbose("HubConnection", "StartNegotiation", this.Context);
  213. if (this.State == ConnectionStates.CloseInitiated)
  214. {
  215. SetState(ConnectionStates.Closed, null, this.defaultReconnect);
  216. return;
  217. }
  218. #if !BESTHTTP_DISABLE_WEBSOCKET
  219. if (this.Options.SkipNegotiation && this.Options.PreferedTransport == TransportTypes.WebSocket)
  220. {
  221. HTTPManager.Logger.Verbose("HubConnection", "Skipping negotiation", this.Context);
  222. ConnectImpl(this.Options.PreferedTransport);
  223. return;
  224. }
  225. #endif
  226. SetState(ConnectionStates.Negotiating, null, this.defaultReconnect);
  227. // https://github.com/dotnet/aspnetcore/blob/master/src/SignalR/docs/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  228. // Send out a negotiation request. While we could skip it and connect right with the websocket transport
  229. // it might return with additional information that could be useful.
  230. UriBuilder builder = new UriBuilder(this.Uri);
  231. if (builder.Path.EndsWith("/"))
  232. builder.Path += "negotiate";
  233. else
  234. builder.Path += "/negotiate";
  235. string query = builder.Query;
  236. if (string.IsNullOrEmpty(query))
  237. query = "negotiateVersion=1";
  238. else
  239. query = query.Remove(0, 1) + "&negotiateVersion=1";
  240. builder.Query = query;
  241. var request = new HTTPRequest(builder.Uri, HTTPMethods.Post, OnNegotiationRequestFinished);
  242. request.Context.Add("Hub", this.Context);
  243. if (this.AuthenticationProvider != null)
  244. this.AuthenticationProvider.PrepareRequest(request);
  245. request.Send();
  246. }
  247. private void ConnectImpl(TransportTypes transport)
  248. {
  249. HTTPManager.Logger.Verbose("HubConnection", "ConnectImpl - " + transport, this.Context);
  250. switch (transport)
  251. {
  252. #if !BESTHTTP_DISABLE_WEBSOCKET
  253. case TransportTypes.WebSocket:
  254. if (this.NegotiationResult != null && !IsTransportSupported("WebSockets"))
  255. {
  256. SetState(ConnectionStates.Closed, "Couldn't use preferred transport, as the 'WebSockets' transport isn't supported by the server!", this.defaultReconnect);
  257. return;
  258. }
  259. this.Transport = new Transports.WebSocketTransport(this);
  260. this.Transport.OnStateChanged += Transport_OnStateChanged;
  261. break;
  262. #endif
  263. case TransportTypes.LongPolling:
  264. if (this.NegotiationResult != null && !IsTransportSupported("LongPolling"))
  265. {
  266. SetState(ConnectionStates.Closed, "Couldn't use preferred transport, as the 'LongPolling' transport isn't supported by the server!", this.defaultReconnect);
  267. return;
  268. }
  269. this.Transport = new Transports.LongPollingTransport(this);
  270. this.Transport.OnStateChanged += Transport_OnStateChanged;
  271. break;
  272. default:
  273. SetState(ConnectionStates.Closed, "Unsupported transport: " + transport, this.defaultReconnect);
  274. break;
  275. }
  276. try
  277. {
  278. if (this.OnTransportEvent != null)
  279. this.OnTransportEvent(this, this.Transport, TransportEvents.SelectedToConnect);
  280. }
  281. catch(Exception ex)
  282. {
  283. HTTPManager.Logger.Exception("HubConnection", "ConnectImpl - OnTransportEvent exception in user code!", ex, this.Context);
  284. }
  285. this.Transport.StartConnect();
  286. }
  287. private bool IsTransportSupported(string transportName)
  288. {
  289. // https://github.com/dotnet/aspnetcore/blob/master/src/SignalR/docs/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  290. // If the negotiation response contains only the url and accessToken, no 'availableTransports' list is sent
  291. if (this.NegotiationResult.SupportedTransports == null)
  292. return true;
  293. for (int i = 0; i < this.NegotiationResult.SupportedTransports.Count; ++i)
  294. if (this.NegotiationResult.SupportedTransports[i].Name.Equals(transportName, StringComparison.OrdinalIgnoreCase))
  295. return true;
  296. return false;
  297. }
  298. private void OnNegotiationRequestFinished(HTTPRequest req, HTTPResponse resp)
  299. {
  300. if (this.State == ConnectionStates.Closed)
  301. return;
  302. if (this.State == ConnectionStates.CloseInitiated)
  303. {
  304. SetState(ConnectionStates.Closed, null, this.defaultReconnect);
  305. return;
  306. }
  307. string errorReason = null;
  308. switch (req.State)
  309. {
  310. // The request finished without any problem.
  311. case HTTPRequestStates.Finished:
  312. if (resp.IsSuccess)
  313. {
  314. HTTPManager.Logger.Information("HubConnection", "Negotiation Request Finished Successfully! Response: " + resp.DataAsText, this.Context);
  315. // Parse negotiation
  316. this.NegotiationResult = NegotiationResult.Parse(resp, out errorReason, this);
  317. // Room for improvement: check validity of the negotiation result:
  318. // If url and accessToken is present, the other two must be null.
  319. // https://github.com/dotnet/aspnetcore/blob/master/src/SignalR/docs/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  320. if (string.IsNullOrEmpty(errorReason))
  321. {
  322. if (this.NegotiationResult.Url != null)
  323. {
  324. this.SetState(ConnectionStates.Redirected, null, this.defaultReconnect);
  325. if (++this.RedirectCount >= this.Options.MaxRedirects)
  326. errorReason = string.Format("MaxRedirects ({0:N0}) reached!", this.Options.MaxRedirects);
  327. else
  328. {
  329. var oldUri = this.Uri;
  330. this.Uri = this.NegotiationResult.Url;
  331. if (this.OnRedirected != null)
  332. {
  333. try
  334. {
  335. this.OnRedirected(this, oldUri, Uri);
  336. }
  337. catch (Exception ex)
  338. {
  339. HTTPManager.Logger.Exception("HubConnection", "OnNegotiationRequestFinished - OnRedirected", ex, this.Context);
  340. }
  341. }
  342. StartConnect();
  343. }
  344. }
  345. else
  346. ConnectImpl(this.Options.PreferedTransport);
  347. }
  348. }
  349. else // Internal server error?
  350. errorReason = string.Format("Negotiation Request Finished Successfully, but the server sent an error. Status Code: {0}-{1} Message: {2}",
  351. resp.StatusCode,
  352. resp.Message,
  353. resp.DataAsText);
  354. break;
  355. // The request finished with an unexpected error. The request's Exception property may contain more info about the error.
  356. case HTTPRequestStates.Error:
  357. errorReason = "Negotiation Request Finished with Error! " + (req.Exception != null ? (req.Exception.Message + "\n" + req.Exception.StackTrace) : "No Exception");
  358. break;
  359. // The request aborted, initiated by the user.
  360. case HTTPRequestStates.Aborted:
  361. errorReason = "Negotiation Request Aborted!";
  362. break;
  363. // Connecting to the server is timed out.
  364. case HTTPRequestStates.ConnectionTimedOut:
  365. errorReason = "Negotiation Request - Connection Timed Out!";
  366. break;
  367. // The request didn't finished in the given time.
  368. case HTTPRequestStates.TimedOut:
  369. errorReason = "Negotiation Request - Processing the request Timed Out!";
  370. break;
  371. }
  372. if (errorReason != null)
  373. {
  374. if (this.ReconnectPolicy != null)
  375. {
  376. RetryContext context = new RetryContext
  377. {
  378. ElapsedTime = DateTime.Now - this.connectionStartedAt,
  379. PreviousRetryCount = this.currentContext.PreviousRetryCount,
  380. RetryReason = errorReason
  381. };
  382. TimeSpan? nextAttempt = null;
  383. try
  384. {
  385. nextAttempt = this.ReconnectPolicy.GetNextRetryDelay(context);
  386. }
  387. catch (Exception ex)
  388. {
  389. HTTPManager.Logger.Exception("HubConnection", "ReconnectPolicy.GetNextRetryDelay", ex, this.Context);
  390. }
  391. if (nextAttempt == null)
  392. {
  393. this.NegotiationResult = new NegotiationResult();
  394. this.NegotiationResult.NegotiationResponse = resp;
  395. SetState(ConnectionStates.Closed, errorReason, this.defaultReconnect);
  396. }
  397. else
  398. {
  399. HTTPManager.Logger.Information("HubConnection", "Next reconnect attempt after " + nextAttempt.Value.ToString(), this.Context);
  400. this.currentContext = context;
  401. this.currentContext.PreviousRetryCount += 1;
  402. this.reconnectAt = DateTime.Now + nextAttempt.Value;
  403. this.SetState(ConnectionStates.Reconnecting, null, this.defaultReconnect);
  404. }
  405. }
  406. else
  407. {
  408. this.NegotiationResult = new NegotiationResult();
  409. this.NegotiationResult.NegotiationResponse = resp;
  410. SetState(ConnectionStates.Closed, errorReason, this.defaultReconnect);
  411. }
  412. }
  413. }
  414. public void StartClose()
  415. {
  416. HTTPManager.Logger.Verbose("HubConnection", "StartClose", this.Context);
  417. this.defaultReconnect = false;
  418. switch(this.State)
  419. {
  420. case ConnectionStates.Initial:
  421. SetState(ConnectionStates.Closed, null, this.defaultReconnect);
  422. break;
  423. case ConnectionStates.Authenticating:
  424. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  425. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  426. this.AuthenticationProvider.Cancel();
  427. SetState(ConnectionStates.Closed, null, this.defaultReconnect);
  428. break;
  429. case ConnectionStates.Reconnecting:
  430. SetState(ConnectionStates.Closed, null, this.defaultReconnect);
  431. break;
  432. case ConnectionStates.CloseInitiated:
  433. case ConnectionStates.Closed:
  434. // Already initiated/closed
  435. break;
  436. default:
  437. if (HTTPManager.IsQuitting)
  438. {
  439. SetState(ConnectionStates.Closed, null, this.defaultReconnect);
  440. }
  441. else
  442. {
  443. SetState(ConnectionStates.CloseInitiated, null, this.defaultReconnect);
  444. if (this.Transport != null)
  445. this.Transport.StartClose();
  446. }
  447. break;
  448. }
  449. }
  450. #if CSHARP_7_OR_LATER
  451. TaskCompletionSource<HubConnection> closeAsyncTaskCompletionSource;
  452. public Task<HubConnection> CloseAsync()
  453. {
  454. if (this.closeAsyncTaskCompletionSource != null)
  455. throw new Exception("CloseAsync already called!");
  456. this.closeAsyncTaskCompletionSource = new TaskCompletionSource<HubConnection>();
  457. this.OnClosed += OnClosedAsyncCallback;
  458. this.OnError += OnClosedAsyncErrorCallback;
  459. // Avoid race condition by caching task prior to StartClose,
  460. // which asynchronously calls OnClosedAsyncCallback, which nulls
  461. // this.closeAsyncTaskCompletionSource immediately before we have
  462. // a chance to read from it.
  463. var task = this.closeAsyncTaskCompletionSource.Task;
  464. this.StartClose();
  465. return task;
  466. }
  467. void OnClosedAsyncCallback(HubConnection hub)
  468. {
  469. this.OnClosed -= OnClosedAsyncCallback;
  470. this.OnError -= OnClosedAsyncErrorCallback;
  471. this.closeAsyncTaskCompletionSource.TrySetResult(this);
  472. this.closeAsyncTaskCompletionSource = null;
  473. }
  474. void OnClosedAsyncErrorCallback(HubConnection hub, string error)
  475. {
  476. this.OnClosed -= OnClosedAsyncCallback;
  477. this.OnError -= OnClosedAsyncErrorCallback;
  478. this.closeAsyncTaskCompletionSource.TrySetException(new Exception(error));
  479. this.closeAsyncTaskCompletionSource = null;
  480. }
  481. #endif
  482. public IFuture<TResult> Invoke<TResult>(string target, params object[] args)
  483. {
  484. Future<TResult> future = new Future<TResult>();
  485. try
  486. {
  487. long id = InvokeImp(target,
  488. args,
  489. (message) =>
  490. {
  491. bool isSuccess = string.IsNullOrEmpty(message.error);
  492. if (isSuccess)
  493. future.Assign((TResult)this.Protocol.ConvertTo(typeof(TResult), message.result));
  494. else
  495. future.Fail(new Exception(message.error));
  496. },
  497. typeof(TResult));
  498. if (id < 0)
  499. future.Fail(new Exception("Not in Connected state! Current state: " + this.State));
  500. }
  501. catch(Exception ex)
  502. {
  503. future.Fail(ex);
  504. }
  505. return future;
  506. }
  507. #if CSHARP_7_OR_LATER
  508. public Task<TResult> InvokeAsync<TResult>(string target, params object[] args)
  509. {
  510. return InvokeAsync<TResult>(target, default(CancellationToken), args);
  511. }
  512. public Task<TResult> InvokeAsync<TResult>(string target, CancellationToken cancellationToken = default, params object[] args)
  513. {
  514. TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();
  515. try
  516. {
  517. long id = InvokeImp(target,
  518. args,
  519. (message) =>
  520. {
  521. if (cancellationToken.IsCancellationRequested)
  522. {
  523. tcs.TrySetCanceled(cancellationToken);
  524. return;
  525. }
  526. bool isSuccess = string.IsNullOrEmpty(message.error);
  527. if (isSuccess)
  528. tcs.TrySetResult((TResult)this.Protocol.ConvertTo(typeof(TResult), message.result));
  529. else
  530. tcs.TrySetException(new Exception(message.error));
  531. },
  532. typeof(TResult));
  533. if (id < 0)
  534. tcs.TrySetException(new Exception("Not in Connected state! Current state: " + this.State));
  535. else
  536. cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
  537. }
  538. catch (Exception ex)
  539. {
  540. tcs.SetException(ex);
  541. }
  542. return tcs.Task;
  543. }
  544. #endif
  545. public IFuture<object> Send(string target, params object[] args)
  546. {
  547. Future<object> future = new Future<object>();
  548. try
  549. {
  550. long id = InvokeImp(target,
  551. args,
  552. (message) =>
  553. {
  554. bool isSuccess = string.IsNullOrEmpty(message.error);
  555. if (isSuccess)
  556. future.Assign(message.item);
  557. else
  558. future.Fail(new Exception(message.error));
  559. },
  560. typeof(object));
  561. if (id < 0)
  562. future.Fail(new Exception("Not in Connected state! Current state: " + this.State));
  563. }
  564. catch (Exception ex)
  565. {
  566. future.Fail(ex);
  567. }
  568. return future;
  569. }
  570. #if CSHARP_7_OR_LATER
  571. public Task<object> SendAsync(string target, params object[] args)
  572. {
  573. return SendAsync(target, default(CancellationToken), args);
  574. }
  575. public Task<object> SendAsync(string target, CancellationToken cancellationToken = default, params object[] args)
  576. {
  577. TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
  578. try
  579. {
  580. long id = InvokeImp(target,
  581. args,
  582. (message) =>
  583. {
  584. if (cancellationToken.IsCancellationRequested)
  585. {
  586. tcs.TrySetCanceled(cancellationToken);
  587. return;
  588. }
  589. bool isSuccess = string.IsNullOrEmpty(message.error);
  590. if (isSuccess)
  591. tcs.TrySetResult(message.item);
  592. else
  593. tcs.TrySetException(new Exception(message.error));
  594. },
  595. typeof(object));
  596. if (id < 0)
  597. tcs.TrySetException(new Exception("Not in Connected state! Current state: " + this.State));
  598. else
  599. cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken));
  600. }
  601. catch (Exception ex)
  602. {
  603. tcs.TrySetException(ex);
  604. }
  605. return tcs.Task;
  606. }
  607. #endif
  608. private long InvokeImp(string target, object[] args, Action<Message> callback, Type itemType, bool isStreamingInvocation = false)
  609. {
  610. if (this.State != ConnectionStates.Connected)
  611. return -1;
  612. bool blockingInvocation = callback == null;
  613. long invocationId = blockingInvocation ? 0 : System.Threading.Interlocked.Increment(ref this.lastInvocationId);
  614. var message = new Message
  615. {
  616. type = isStreamingInvocation ? MessageTypes.StreamInvocation : MessageTypes.Invocation,
  617. invocationId = blockingInvocation ? null : invocationId.ToString(),
  618. target = target,
  619. arguments = args,
  620. nonblocking = callback == null,
  621. };
  622. SendMessage(message);
  623. if (!blockingInvocation)
  624. if (!this.invocations.TryAdd(invocationId, new InvocationDefinition { callback = callback, returnType = itemType }))
  625. HTTPManager.Logger.Warning("HubConnection", "InvokeImp - invocations already contains id: " + invocationId, this.Context);
  626. return invocationId;
  627. }
  628. internal void SendMessage(Message message)
  629. {
  630. // https://github.com/Benedicht/BestHTTP-Issues/issues/146
  631. if (this.State == ConnectionStates.Closed)
  632. return;
  633. if (HTTPManager.Logger.Level == Logger.Loglevels.All)
  634. HTTPManager.Logger.Verbose("HubConnection", "SendMessage: " + message.ToString(), this.Context);
  635. try
  636. {
  637. using (new WriteLock(this.rwLock))
  638. {
  639. var encoded = this.Protocol.EncodeMessage(message);
  640. if (encoded.Data != null)
  641. {
  642. this.lastMessageSentAt = DateTime.Now;
  643. this.Transport.Send(encoded);
  644. }
  645. }
  646. }
  647. catch (Exception ex)
  648. {
  649. HTTPManager.Logger.Exception("HubConnection", "SendMessage", ex, this.Context);
  650. throw;
  651. }
  652. }
  653. public DownStreamItemController<TDown> GetDownStreamController<TDown>(string target, params object[] args)
  654. {
  655. long invocationId = System.Threading.Interlocked.Increment(ref this.lastInvocationId);
  656. var future = new Future<TDown>();
  657. future.BeginProcess();
  658. var controller = new DownStreamItemController<TDown>(this, invocationId, future);
  659. Action<Message> callback = (Message msg) =>
  660. {
  661. switch (msg.type)
  662. {
  663. // StreamItem message contains only one item.
  664. case MessageTypes.StreamItem:
  665. {
  666. if (controller.IsCanceled)
  667. break;
  668. TDown item = (TDown)this.Protocol.ConvertTo(typeof(TDown), msg.item);
  669. future.AssignItem(item);
  670. break;
  671. }
  672. case MessageTypes.Completion:
  673. {
  674. bool isSuccess = string.IsNullOrEmpty(msg.error);
  675. if (isSuccess)
  676. {
  677. // While completion message must not contain any result, this should be future-proof
  678. if (!controller.IsCanceled && msg.result != null)
  679. {
  680. TDown result = (TDown)this.Protocol.ConvertTo(typeof(TDown), msg.result);
  681. future.AssignItem(result);
  682. }
  683. future.Finish();
  684. }
  685. else
  686. future.Fail(new Exception(msg.error));
  687. break;
  688. }
  689. }
  690. };
  691. var message = new Message
  692. {
  693. type = MessageTypes.StreamInvocation,
  694. invocationId = invocationId.ToString(),
  695. target = target,
  696. arguments = args,
  697. nonblocking = false,
  698. };
  699. try
  700. {
  701. SendMessage(message);
  702. }
  703. catch(Exception ex)
  704. {
  705. future.Fail(ex);
  706. }
  707. if (callback != null)
  708. if (!this.invocations.TryAdd(invocationId, new InvocationDefinition { callback = callback, returnType = typeof(TDown) }))
  709. HTTPManager.Logger.Warning("HubConnection", "GetDownStreamController - invocations already contains id: " + invocationId, this.Context);
  710. return controller;
  711. }
  712. public UpStreamItemController<TResult> GetUpStreamController<TResult>(string target, int paramCount, bool downStream, object[] args)
  713. {
  714. Future<TResult> future = new Future<TResult>();
  715. future.BeginProcess();
  716. long invocationId = System.Threading.Interlocked.Increment(ref this.lastInvocationId);
  717. string[] streamIds = new string[paramCount];
  718. for (int i = 0; i < paramCount; i++)
  719. streamIds[i] = System.Threading.Interlocked.Increment(ref this.lastStreamId).ToString();
  720. var controller = new UpStreamItemController<TResult>(this, invocationId, streamIds, future);
  721. Action<Message> callback = (Message msg) => {
  722. switch (msg.type)
  723. {
  724. // StreamItem message contains only one item.
  725. case MessageTypes.StreamItem:
  726. {
  727. if (controller.IsCanceled)
  728. break;
  729. TResult item = (TResult)this.Protocol.ConvertTo(typeof(TResult), msg.item);
  730. future.AssignItem(item);
  731. break;
  732. }
  733. case MessageTypes.Completion:
  734. {
  735. bool isSuccess = string.IsNullOrEmpty(msg.error);
  736. if (isSuccess)
  737. {
  738. // While completion message must not contain any result, this should be future-proof
  739. if (!controller.IsCanceled && msg.result != null)
  740. {
  741. TResult result = (TResult)this.Protocol.ConvertTo(typeof(TResult), msg.result);
  742. future.AssignItem(result);
  743. }
  744. future.Finish();
  745. }
  746. else
  747. {
  748. var ex = new Exception(msg.error);
  749. future.Fail(ex);
  750. }
  751. break;
  752. }
  753. }
  754. };
  755. var messageToSend = new Message
  756. {
  757. type = downStream ? MessageTypes.StreamInvocation : MessageTypes.Invocation,
  758. invocationId = invocationId.ToString(),
  759. target = target,
  760. arguments = args,
  761. streamIds = streamIds,
  762. nonblocking = false,
  763. };
  764. try
  765. {
  766. SendMessage(messageToSend);
  767. }
  768. catch(Exception ex)
  769. {
  770. future.Fail(ex);
  771. }
  772. if (!this.invocations.TryAdd(invocationId, new InvocationDefinition { callback = callback, returnType = typeof(TResult) }))
  773. HTTPManager.Logger.Warning("HubConnection", "GetUpStreamController - invocations already contains id: " + invocationId, this.Context);
  774. return controller;
  775. }
  776. public void On(string methodName, Action callback)
  777. {
  778. On(methodName, null, (args) => callback());
  779. }
  780. public void On<T1>(string methodName, Action<T1> callback)
  781. {
  782. On(methodName, new Type[] { typeof(T1) }, (args) => callback((T1)args[0]));
  783. }
  784. public void On<T1, T2>(string methodName, Action<T1, T2> callback)
  785. {
  786. On(methodName,
  787. new Type[] { typeof(T1), typeof(T2) },
  788. (args) => callback((T1)args[0], (T2)args[1]));
  789. }
  790. public void On<T1, T2, T3>(string methodName, Action<T1, T2, T3> callback)
  791. {
  792. On(methodName,
  793. new Type[] { typeof(T1), typeof(T2), typeof(T3) },
  794. (args) => callback((T1)args[0], (T2)args[1], (T3)args[2]));
  795. }
  796. public void On<T1, T2, T3, T4>(string methodName, Action<T1, T2, T3, T4> callback)
  797. {
  798. On(methodName,
  799. new Type[] { typeof(T1), typeof(T2), typeof(T3), typeof(T4) },
  800. (args) => callback((T1)args[0], (T2)args[1], (T3)args[2], (T4)args[3]));
  801. }
  802. private void On(string methodName, Type[] paramTypes, Action<object[]> callback)
  803. {
  804. this.subscriptions.GetOrAdd(methodName, _ => new Subscription())
  805. .Add(paramTypes, callback);
  806. }
  807. public void On<Result>(string methodName, Func<Result> callback)
  808. {
  809. OnFunc<Result>(methodName, null, (args) => callback());
  810. }
  811. public void On<T1, Result>(string methodName, Func<T1, Result> callback)
  812. {
  813. OnFunc<Result>(methodName, new Type[] { typeof(T1) }, (args) => callback((T1)args[0]));
  814. }
  815. public void On<T1, T2, Result>(string methodName, Func<T1, T2, Result> callback)
  816. {
  817. OnFunc<Result>(methodName, new Type[] { typeof(T1), typeof(T2) }, (args) => callback((T1)args[0], (T2)args[1]));
  818. }
  819. public void On<T1, T2, T3, Result>(string methodName, Func<T1, T2, T3, Result> callback)
  820. {
  821. OnFunc<Result>(methodName, new Type[] { typeof(T1), typeof(T2), typeof(T3) }, (args) => callback((T1)args[0], (T2)args[1], (T3)args[2]));
  822. }
  823. public void On<T1, T2, T3, T4, Result>(string methodName, Func<T1, T2, T3, T4, Result> callback)
  824. {
  825. OnFunc<Result>(methodName, new Type[] { typeof(T1), typeof(T2), typeof(T3), typeof(T4) }, (args) => callback((T1)args[0], (T2)args[1], (T3)args[2], (T4)args[3]));
  826. }
  827. // https://github.com/dotnet/aspnetcore/issues/5280
  828. private void OnFunc<Result>(string methodName, Type[] paramTypes, Func<object[], object> callback)
  829. {
  830. this.subscriptions.GetOrAdd(methodName, _ => new Subscription())
  831. .AddFunc(typeof(Result), paramTypes, callback);
  832. }
  833. /// <summary>
  834. /// Remove all event handlers for <paramref name="methodName"/> that subscribed with an On call.
  835. /// </summary>
  836. public void Remove(string methodName)
  837. {
  838. Subscription _;
  839. this.subscriptions.TryRemove(methodName, out _);
  840. }
  841. internal Subscription GetSubscription(string methodName)
  842. {
  843. Subscription subscribtion = null;
  844. this.subscriptions.TryGetValue(methodName, out subscribtion);
  845. return subscribtion;
  846. }
  847. internal Type GetItemType(long invocationId)
  848. {
  849. InvocationDefinition def;
  850. this.invocations.TryGetValue(invocationId, out def);
  851. return def.returnType;
  852. }
  853. List<Message> delayedMessages;
  854. internal void OnMessages(List<Message> messages)
  855. {
  856. this.lastMessageReceivedAt = DateTime.Now;
  857. if (pausedInLastFrame)
  858. {
  859. if (this.delayedMessages == null)
  860. this.delayedMessages = new List<Message>(messages.Count);
  861. foreach(var msg in messages)
  862. delayedMessages.Add(msg);
  863. messages.Clear();
  864. }
  865. for (int messageIdx = 0; messageIdx < messages.Count; ++messageIdx)
  866. {
  867. var message = messages[messageIdx];
  868. if (this.OnMessage != null)
  869. {
  870. try
  871. {
  872. if (!this.OnMessage(this, message))
  873. continue;
  874. }
  875. catch (Exception ex)
  876. {
  877. HTTPManager.Logger.Exception("HubConnection", "Exception in OnMessage user code!", ex, this.Context);
  878. }
  879. }
  880. switch (message.type)
  881. {
  882. case MessageTypes.Handshake:
  883. break;
  884. case MessageTypes.Invocation:
  885. {
  886. Subscription subscribtion = null;
  887. if (this.subscriptions.TryGetValue(message.target, out subscribtion))
  888. {
  889. if (subscribtion.callbacks?.Count == 0 && subscribtion.functionCallbacks?.Count == 0)
  890. HTTPManager.Logger.Warning("HubConnection", $"No callback for invocation '{message.ToString()}'", this.Context);
  891. for (int i = 0; i < subscribtion.callbacks.Count; ++i)
  892. {
  893. var callbackDesc = subscribtion.callbacks[i];
  894. object[] realArgs = null;
  895. try
  896. {
  897. realArgs = this.Protocol.GetRealArguments(callbackDesc.ParamTypes, message.arguments);
  898. }
  899. catch (Exception ex)
  900. {
  901. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - GetRealArguments", ex, this.Context);
  902. }
  903. try
  904. {
  905. callbackDesc.Callback.Invoke(realArgs);
  906. }
  907. catch (Exception ex)
  908. {
  909. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - Invoke", ex, this.Context);
  910. }
  911. }
  912. if (subscribtion.functionCallbacks != null)
  913. {
  914. for (int i = 0; i < subscribtion.functionCallbacks.Count; ++i)
  915. {
  916. var callbackDesc = subscribtion.functionCallbacks[i];
  917. object[] realArgs = null;
  918. try
  919. {
  920. realArgs = this.Protocol.GetRealArguments(callbackDesc.ParamTypes, message.arguments);
  921. }
  922. catch (Exception ex)
  923. {
  924. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Function Invocation - GetRealArguments", ex, this.Context);
  925. }
  926. try
  927. {
  928. var result = callbackDesc.Callback(realArgs);
  929. if (result is Task task && task.GetType() is Type taskType && taskType.IsGenericType)
  930. {
  931. task.ContinueWith((t) =>
  932. {
  933. Exception error = null;
  934. try
  935. {
  936. if (t.IsCanceled || t.IsFaulted)
  937. {
  938. error = t.Exception.InnerException ?? new TaskCanceledException();
  939. }
  940. else
  941. {
  942. var prop = taskType.GetProperty("Result");
  943. var taskResult = prop.GetValue(t);
  944. SendMessage(new Message { type = MessageTypes.Completion, invocationId = message.invocationId, result = taskResult });
  945. }
  946. }
  947. catch (Exception ex)
  948. {
  949. error = ex;
  950. }
  951. if (error != null)
  952. SendMessage(new Message { type = MessageTypes.Completion, invocationId = message.invocationId, error = error.Message });
  953. });
  954. }
  955. else
  956. SendMessage(new Message { type = MessageTypes.Completion, invocationId = message.invocationId, result = result });
  957. }
  958. catch (Exception ex)
  959. {
  960. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Function Invocation - Invoke", ex, this.Context);
  961. SendMessage(new Message { type = MessageTypes.Completion, invocationId = message.invocationId, error = ex.Message });
  962. }
  963. }
  964. }
  965. }
  966. else
  967. HTTPManager.Logger.Warning("HubConnection", $"No subscription could be found for invocation '{message.ToString()}'", this.Context);
  968. break;
  969. }
  970. case MessageTypes.StreamItem:
  971. {
  972. long invocationId;
  973. if (long.TryParse(message.invocationId, out invocationId))
  974. {
  975. InvocationDefinition def;
  976. if (this.invocations.TryGetValue(invocationId, out def) && def.callback != null)
  977. {
  978. try
  979. {
  980. def.callback(message);
  981. }
  982. catch (Exception ex)
  983. {
  984. HTTPManager.Logger.Exception("HubConnection", "OnMessages - StreamItem - callback", ex, this.Context);
  985. }
  986. }
  987. }
  988. break;
  989. }
  990. case MessageTypes.Completion:
  991. {
  992. long invocationId;
  993. if (long.TryParse(message.invocationId, out invocationId))
  994. {
  995. InvocationDefinition def;
  996. if (this.invocations.TryRemove(invocationId, out def) && def.callback != null)
  997. {
  998. try
  999. {
  1000. def.callback(message);
  1001. }
  1002. catch (Exception ex)
  1003. {
  1004. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Completion - callback", ex, this.Context);
  1005. }
  1006. }
  1007. }
  1008. break;
  1009. }
  1010. case MessageTypes.Ping:
  1011. // Send back an answer
  1012. SendMessage(new Message() { type = MessageTypes.Ping });
  1013. break;
  1014. case MessageTypes.Close:
  1015. SetState(ConnectionStates.Closed, message.error, message.allowReconnect);
  1016. if (this.Transport != null)
  1017. this.Transport.StartClose();
  1018. return;
  1019. }
  1020. }
  1021. }
  1022. private void Transport_OnStateChanged(TransportStates oldState, TransportStates newState)
  1023. {
  1024. HTTPManager.Logger.Verbose("HubConnection", string.Format("Transport_OnStateChanged - oldState: {0} newState: {1}", oldState.ToString(), newState.ToString()), this.Context);
  1025. if (this.State == ConnectionStates.Closed)
  1026. {
  1027. HTTPManager.Logger.Verbose("HubConnection", "Transport_OnStateChanged - already closed!", this.Context);
  1028. return;
  1029. }
  1030. switch (newState)
  1031. {
  1032. case TransportStates.Connected:
  1033. try
  1034. {
  1035. if (this.OnTransportEvent != null)
  1036. this.OnTransportEvent(this, this.Transport, TransportEvents.Connected);
  1037. }
  1038. catch (Exception ex)
  1039. {
  1040. HTTPManager.Logger.Exception("HubConnection", "Exception in OnTransportEvent user code!", ex, this.Context);
  1041. }
  1042. SetState(ConnectionStates.Connected, null, this.defaultReconnect);
  1043. break;
  1044. case TransportStates.Failed:
  1045. if (this.State == ConnectionStates.Negotiating && !HTTPManager.IsQuitting)
  1046. {
  1047. try
  1048. {
  1049. if (this.OnTransportEvent != null)
  1050. this.OnTransportEvent(this, this.Transport, TransportEvents.FailedToConnect);
  1051. }
  1052. catch (Exception ex)
  1053. {
  1054. HTTPManager.Logger.Exception("HubConnection", "Exception in OnTransportEvent user code!", ex, this.Context);
  1055. }
  1056. this.triedoutTransports.Add(this.Transport.TransportType);
  1057. var nextTransport = GetNextTransportToTry();
  1058. if (nextTransport == null)
  1059. {
  1060. var reason = this.Transport.ErrorReason;
  1061. this.Transport = null;
  1062. SetState(ConnectionStates.Closed, reason, this.defaultReconnect);
  1063. }
  1064. else
  1065. ConnectImpl(nextTransport.Value);
  1066. }
  1067. else
  1068. {
  1069. try
  1070. {
  1071. if (this.OnTransportEvent != null)
  1072. this.OnTransportEvent(this, this.Transport, TransportEvents.ClosedWithError);
  1073. }
  1074. catch (Exception ex)
  1075. {
  1076. HTTPManager.Logger.Exception("HubConnection", "Exception in OnTransportEvent user code!", ex, this.Context);
  1077. }
  1078. var reason = this.Transport.ErrorReason;
  1079. this.Transport = null;
  1080. SetState(ConnectionStates.Closed, HTTPManager.IsQuitting ? null : reason, this.defaultReconnect);
  1081. }
  1082. break;
  1083. case TransportStates.Closed:
  1084. {
  1085. try
  1086. {
  1087. if (this.OnTransportEvent != null)
  1088. this.OnTransportEvent(this, this.Transport, TransportEvents.Closed);
  1089. }
  1090. catch (Exception ex)
  1091. {
  1092. HTTPManager.Logger.Exception("HubConnection", "Exception in OnTransportEvent user code!", ex, this.Context);
  1093. }
  1094. // Check wheter we have any delayed message and a Close message among them. If there's one, delay the SetState(Close) too.
  1095. if (this.delayedMessages == null || this.delayedMessages.FindLast(dm => dm.type == MessageTypes.Close).type != MessageTypes.Close)
  1096. SetState(ConnectionStates.Closed, null, this.defaultReconnect);
  1097. }
  1098. break;
  1099. }
  1100. }
  1101. private TransportTypes? GetNextTransportToTry()
  1102. {
  1103. foreach (TransportTypes val in Enum.GetValues(typeof(TransportTypes)))
  1104. if (!this.triedoutTransports.Contains(val) && IsTransportSupported(val.ToString()))
  1105. return val;
  1106. return null;
  1107. }
  1108. bool defaultReconnect = true;
  1109. private void SetState(ConnectionStates state, string errorReason, bool allowReconnect)
  1110. {
  1111. HTTPManager.Logger.Information("HubConnection", string.Format("SetState - from State: '{0}' to State: '{1}', errorReason: '{2}', allowReconnect: {3}, isQuitting: {4}", this.State, state, errorReason, allowReconnect, HTTPManager.IsQuitting), this.Context);
  1112. if (this.State == state)
  1113. return;
  1114. var previousState = this.State;
  1115. this.State = state;
  1116. switch (state)
  1117. {
  1118. case ConnectionStates.Initial:
  1119. case ConnectionStates.Authenticating:
  1120. case ConnectionStates.Negotiating:
  1121. case ConnectionStates.CloseInitiated:
  1122. break;
  1123. case ConnectionStates.Reconnecting:
  1124. break;
  1125. case ConnectionStates.Connected:
  1126. // If reconnectStartTime isn't its default value we reconnected
  1127. if (this.reconnectStartTime != DateTime.MinValue)
  1128. {
  1129. try
  1130. {
  1131. if (this.OnReconnected != null)
  1132. this.OnReconnected(this);
  1133. }
  1134. catch (Exception ex)
  1135. {
  1136. HTTPManager.Logger.Exception("HubConnection", "OnReconnected", ex, this.Context);
  1137. }
  1138. }
  1139. else
  1140. {
  1141. try
  1142. {
  1143. if (this.OnConnected != null)
  1144. this.OnConnected(this);
  1145. }
  1146. catch (Exception ex)
  1147. {
  1148. HTTPManager.Logger.Exception("HubConnection", "Exception in OnConnected user code!", ex, this.Context);
  1149. }
  1150. }
  1151. this.lastMessageSentAt = DateTime.Now;
  1152. this.lastMessageReceivedAt = DateTime.Now;
  1153. // Clean up reconnect related fields
  1154. this.currentContext = new RetryContext();
  1155. this.reconnectStartTime = DateTime.MinValue;
  1156. this.reconnectAt = DateTime.MinValue;
  1157. HTTPUpdateDelegator.OnApplicationForegroundStateChanged -= this.OnApplicationForegroundStateChanged;
  1158. HTTPUpdateDelegator.OnApplicationForegroundStateChanged += this.OnApplicationForegroundStateChanged;
  1159. break;
  1160. case ConnectionStates.Closed:
  1161. // Go through all invocations and cancel them.
  1162. var error = new Message();
  1163. error.type = MessageTypes.Close;
  1164. error.error = errorReason;
  1165. foreach (var kvp in this.invocations)
  1166. {
  1167. try
  1168. {
  1169. kvp.Value.callback(error);
  1170. }
  1171. catch
  1172. { }
  1173. }
  1174. this.invocations.Clear();
  1175. // No errorReason? It's an expected closure.
  1176. if (errorReason == null && (!allowReconnect || HTTPManager.IsQuitting))
  1177. {
  1178. if (this.OnClosed != null)
  1179. {
  1180. try
  1181. {
  1182. this.OnClosed(this);
  1183. }
  1184. catch(Exception ex)
  1185. {
  1186. HTTPManager.Logger.Exception("HubConnection", "Exception in OnClosed user code!", ex, this.Context);
  1187. }
  1188. }
  1189. }
  1190. else
  1191. {
  1192. // If possible, try to reconnect
  1193. if (allowReconnect && this.ReconnectPolicy != null && (previousState == ConnectionStates.Connected || this.reconnectStartTime != DateTime.MinValue))
  1194. {
  1195. // It's the first attempt after a successful connection
  1196. if (this.reconnectStartTime == DateTime.MinValue)
  1197. {
  1198. this.connectionStartedAt = this.reconnectStartTime = DateTime.Now;
  1199. try
  1200. {
  1201. if (this.OnReconnecting != null)
  1202. this.OnReconnecting(this, errorReason);
  1203. }
  1204. catch (Exception ex)
  1205. {
  1206. HTTPManager.Logger.Exception("HubConnection", "SetState - ConnectionStates.Reconnecting", ex, this.Context);
  1207. }
  1208. }
  1209. RetryContext context = new RetryContext
  1210. {
  1211. ElapsedTime = DateTime.Now - this.reconnectStartTime,
  1212. PreviousRetryCount = this.currentContext.PreviousRetryCount,
  1213. RetryReason = errorReason
  1214. };
  1215. TimeSpan? nextAttempt = null;
  1216. try
  1217. {
  1218. nextAttempt = this.ReconnectPolicy.GetNextRetryDelay(context);
  1219. }
  1220. catch (Exception ex)
  1221. {
  1222. HTTPManager.Logger.Exception("HubConnection", "ReconnectPolicy.GetNextRetryDelay", ex, this.Context);
  1223. }
  1224. // No more reconnect attempt, we are closing
  1225. if (nextAttempt == null)
  1226. {
  1227. HTTPManager.Logger.Warning("HubConnection", "No more reconnect attempt!", this.Context);
  1228. // Clean up everything
  1229. this.currentContext = new RetryContext();
  1230. this.reconnectStartTime = DateTime.MinValue;
  1231. this.reconnectAt = DateTime.MinValue;
  1232. }
  1233. else
  1234. {
  1235. HTTPManager.Logger.Information("HubConnection", "Next reconnect attempt after " + nextAttempt.Value.ToString(), this.Context);
  1236. this.currentContext = context;
  1237. this.currentContext.PreviousRetryCount += 1;
  1238. this.reconnectAt = DateTime.Now + nextAttempt.Value;
  1239. this.SetState(ConnectionStates.Reconnecting, null, this.defaultReconnect);
  1240. return;
  1241. }
  1242. }
  1243. if (this.OnError != null)
  1244. {
  1245. try
  1246. {
  1247. this.OnError(this, errorReason);
  1248. }
  1249. catch(Exception ex)
  1250. {
  1251. HTTPManager.Logger.Exception("HubConnection", "Exception in OnError user code!", ex, this.Context);
  1252. }
  1253. }
  1254. }
  1255. break;
  1256. }
  1257. }
  1258. private void OnApplicationForegroundStateChanged(bool isPaused)
  1259. {
  1260. pausedInLastFrame = !isPaused;
  1261. HTTPManager.Logger.Information("HubConnection", $"OnApplicationForegroundStateChanged isPaused: {isPaused} pausedInLastFrame: {pausedInLastFrame}", this.Context);
  1262. }
  1263. void BestHTTP.Extensions.IHeartbeat.OnHeartbeatUpdate(TimeSpan dif)
  1264. {
  1265. switch (this.State)
  1266. {
  1267. case ConnectionStates.Negotiating:
  1268. case ConnectionStates.Authenticating:
  1269. case ConnectionStates.Redirected:
  1270. if (DateTime.Now >= this.connectionStartedAt + this.Options.ConnectTimeout)
  1271. {
  1272. if (this.AuthenticationProvider != null)
  1273. {
  1274. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  1275. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  1276. try
  1277. {
  1278. this.AuthenticationProvider.Cancel();
  1279. }
  1280. catch(Exception ex)
  1281. {
  1282. HTTPManager.Logger.Exception("HubConnection", "Exception in AuthenticationProvider.Cancel !", ex, this.Context);
  1283. }
  1284. }
  1285. if (this.Transport != null)
  1286. {
  1287. this.Transport.OnStateChanged -= Transport_OnStateChanged;
  1288. this.Transport.StartClose();
  1289. }
  1290. SetState(ConnectionStates.Closed, string.Format("Couldn't connect in the given time({0})!", this.Options.ConnectTimeout), this.defaultReconnect);
  1291. }
  1292. break;
  1293. case ConnectionStates.Connected:
  1294. if (this.delayedMessages?.Count > 0)
  1295. {
  1296. pausedInLastFrame = false;
  1297. try
  1298. {
  1299. // if there's any Close message clear any other one.
  1300. int idx = this.delayedMessages.FindLastIndex(dm => dm.type == MessageTypes.Close);
  1301. if (idx > 0)
  1302. this.delayedMessages.RemoveRange(0, idx);
  1303. OnMessages(this.delayedMessages);
  1304. }
  1305. finally
  1306. {
  1307. this.delayedMessages.Clear();
  1308. }
  1309. }
  1310. // Still connected? Check pinging.
  1311. if (this.State == ConnectionStates.Connected)
  1312. {
  1313. if (this.Options.PingInterval != TimeSpan.Zero && DateTime.Now - this.lastMessageReceivedAt >= this.Options.PingTimeoutInterval)
  1314. {
  1315. // The transport itself can be in a failure state or in a completely valid one, so while we do not want to receive anything from it, we have to try to close it
  1316. if (this.Transport != null)
  1317. {
  1318. this.Transport.OnStateChanged -= Transport_OnStateChanged;
  1319. this.Transport.StartClose();
  1320. }
  1321. SetState(ConnectionStates.Closed,
  1322. string.Format("PingInterval set to '{0}' and no message is received since '{1}'. PingTimeoutInterval: '{2}'", this.Options.PingInterval, this.lastMessageReceivedAt, this.Options.PingTimeoutInterval),
  1323. this.defaultReconnect);
  1324. }
  1325. else if (this.Options.PingInterval != TimeSpan.Zero && DateTime.Now - this.lastMessageSentAt >= this.Options.PingInterval)
  1326. SendMessage(new Message() { type = MessageTypes.Ping });
  1327. }
  1328. break;
  1329. case ConnectionStates.Reconnecting:
  1330. if (this.reconnectAt != DateTime.MinValue && DateTime.Now >= this.reconnectAt)
  1331. {
  1332. this.delayedMessages?.Clear();
  1333. this.connectionStartedAt = DateTime.Now;
  1334. this.reconnectAt = DateTime.MinValue;
  1335. this.triedoutTransports.Clear();
  1336. this.StartConnect();
  1337. }
  1338. break;
  1339. case ConnectionStates.Closed:
  1340. CleanUp();
  1341. break;
  1342. }
  1343. }
  1344. private void CleanUp()
  1345. {
  1346. HTTPManager.Logger.Information("HubConnection", "CleanUp", this.Context);
  1347. this.delayedMessages?.Clear();
  1348. HTTPManager.Heartbeats.Unsubscribe(this);
  1349. HTTPUpdateDelegator.OnApplicationForegroundStateChanged -= this.OnApplicationForegroundStateChanged;
  1350. this.rwLock?.Dispose();
  1351. this.rwLock = null;
  1352. }
  1353. }
  1354. }
  1355. #endif