Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions Algorithm/QCAlgorithm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public partial class QCAlgorithm : MarshalByRefObject, IAlgorithm
private IStatisticsService _statisticsService;
private IBrokerageModel _brokerageModel;

private Action _brokerageReconnectAction;

private bool _sentBroadcastCommandsDisabled;
private readonly HashSet<string> _oneTimeCommandErrors = new();
private readonly Dictionary<string, Func<CallbackCommand, bool?>> _registeredCommands = new(StringComparer.InvariantCultureIgnoreCase);
Expand Down Expand Up @@ -1424,6 +1426,33 @@ public void SetBrokerageMessageHandler(IBrokerageMessageHandler handler)
BrokerageMessageHandler = handler ?? throw new ArgumentNullException(nameof(handler));
}

/// <summary>
/// Sets the action used to reconnect the brokerage. Intended for internal use by the Lean engine only.
/// </summary>
/// <param name="reconnectAction">The action that performs the disconnect/connect cycle</param>
[DocumentationAttribute(LiveTrading)]
public void SetBrokerageReconnectAction(Action reconnectAction)
{
_brokerageReconnectAction = reconnectAction ?? throw new ArgumentNullException(nameof(reconnectAction));
}

/// <summary>
/// Triggers a brokerage disconnect/reconnect cycle. Use this when the algorithm detects
/// a data quality issue (e.g. a frozen market data feed) and needs to self-heal without
/// human intervention. Only meaningful in live trading; no-op in backtesting.
/// </summary>
[DocumentationAttribute(LiveTrading)]
public void ReconnectBrokerage()
{
if (_brokerageReconnectAction == null)
{
Debug("ReconnectBrokerage(): no reconnect action registered, ignoring.");
return;
}
Log("ReconnectBrokerage(): triggering brokerage reconnect.");
_brokerageReconnectAction();
}

/// <summary>
/// Sets the risk free interest rate model to be used in the algorithm
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions AlgorithmFactory/Python/Wrappers/AlgorithmPythonWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,12 @@ public void OnWarmupFinished()
/// <param name="handler">The message handler to use</param>
public void SetBrokerageMessageHandler(IBrokerageMessageHandler handler) => _baseAlgorithm.SetBrokerageMessageHandler(handler);

/// <inheritdoc />
public void SetBrokerageReconnectAction(Action reconnectAction) => _baseAlgorithm.SetBrokerageReconnectAction(reconnectAction);

/// <inheritdoc />
public void ReconnectBrokerage() => _baseAlgorithm.ReconnectBrokerage();

/// <summary>
/// Sets the brokerage model used to resolve transaction models, settlement models,
/// and brokerage specified ordering behaviors.
Expand Down
13 changes: 13 additions & 0 deletions Common/Interfaces/IAlgorithm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,19 @@ Security AddSecurity(Symbol symbol, Resolution? resolution = null, bool? fillFor
/// <param name="handler">The message handler to use</param>
void SetBrokerageMessageHandler(IBrokerageMessageHandler handler);

/// <summary>
/// Sets the action used to reconnect the brokerage. Intended for internal use by the Lean engine only.
/// </summary>
/// <param name="reconnectAction">The action that performs the disconnect/connect cycle</param>
void SetBrokerageReconnectAction(Action reconnectAction);

/// <summary>
/// Triggers a brokerage disconnect/reconnect cycle. Use this when the algorithm detects
/// a data quality issue (e.g. a frozen market data feed) and needs to self-heal without
/// human intervention. Only meaningful in live trading; no-op in backtesting.
/// </summary>
void ReconnectBrokerage();

/// <summary>
/// Set the historical data provider
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions Engine/Engine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ public void Run(AlgorithmNodePacket job, AlgorithmManager manager, string assemb
// initialize the default brokerage message handler
algorithm.BrokerageMessageHandler = factory.CreateBrokerageMessageHandler(algorithm, job, SystemHandlers.Api);

// allow the algorithm to trigger a brokerage reconnect (e.g. when it detects a frozen data feed)
algorithm.SetBrokerageReconnectAction(() =>
{
brokerage.Disconnect();
brokerage.Connect();
});

var brokerageDataQueueHandlers = Composer.Instance.GetParts<IDataQueueHandler>().OfType<IBrokerage>()
// In backtesting, brokerages can be used as data downloaders (BrokerageDataDownloader)
// and are added to the composer as IBrokerage
Expand Down
75 changes: 75 additions & 0 deletions Tests/Algorithm/AlgorithmReconnectBrokerageTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

using System;
using NUnit.Framework;
using QuantConnect.Algorithm;
using QuantConnect.Tests.Engine.DataFeeds;

namespace QuantConnect.Tests.Algorithm
{
[TestFixture]
public class AlgorithmReconnectBrokerageTests
{
private QCAlgorithm _algo;

[SetUp]
public void Setup()
{
_algo = new QCAlgorithm();
_algo.SubscriptionManager.SetDataManager(new DataManagerStub(_algo));
}

[Test]
public void ReconnectBrokerage_InvokesRegisteredAction()
{
var reconnectCalled = false;
_algo.SetBrokerageReconnectAction(() => reconnectCalled = true);

_algo.ReconnectBrokerage();

Assert.IsTrue(reconnectCalled);
}

[Test]
public void ReconnectBrokerage_NoOp_WhenNoActionRegistered()
{
// should not throw when no action has been set
Assert.DoesNotThrow(() => _algo.ReconnectBrokerage());
}

[Test]
public void SetBrokerageReconnectAction_ThrowsOnNull()
{
Assert.Throws<ArgumentNullException>(() => _algo.SetBrokerageReconnectAction(null));
}

[Test]
public void ReconnectBrokerage_InvokesDisconnectThenConnect()
{
var callOrder = new System.Collections.Generic.List<string>();
_algo.SetBrokerageReconnectAction(() =>
{
callOrder.Add("disconnect");
callOrder.Add("connect");
});

_algo.ReconnectBrokerage();

Assert.AreEqual(new[] { "disconnect", "connect" }, callOrder);
}
}
}