Секреты TSLab | Торговые роботы | События
20 Мар

Торговые роботы и автоматное программирование на StockSharp

Торговые роботы и автоматное программирование на примере системы обеспечения бесперебойной связи с сервером брокера.

Автор статьи. ra81 ( Родион – преподаватель курсов RusAlgo )

Оглавление

  1. введение 
  2. постановка задачи 
  3. описание логики системы 
  4. реализация системы 
  5. доработка системы с помощью EventDispatcher 
  6. приложение А

1. Введение

Если вы столкнулись с проблемой увеличения сложности кода роботов, которая приводит к снижению читабельности, увеличению числа логических ошибок (которые ловить сложнее всего) и прочим радостям, вы можете облегчить программирование логики роботов, используя готовые разработки и методику автоматного программирования. В данной статье мы рассмотрим эту методику на примере системы обеспечения бесперебойной связи с сервером брокера.
В каждом торговом роботе, написанном на stocksharp, есть как минимум 1 объект реализующий интерфейс ITrader. У меня это AlfaTrader, у вас возможно PlazaTrader или что-то еще. Это один из самых главных объектов всего робота, который вкратце зовется коннектором. Через него торговый робот общается с терминалом, отдает приказы и получает данные. Любой сбой в передаче приказов или получении данных может привести к очень серьезным последствиям для вашего счета. Естественно, что необходимо организовать максимальное надежное подключение терминала к серверу брокера, а коннектора к терминалу. Бывает, что сервер брокера отрубает терминал от себя по каким-то своим странным причинам, и если нас нет рядом с терминалом, наши позиции будут существовать отдельно от наших роботов. Я намеренно опускаю все аппаратные проблемы (проблемы с интернетом, компьютером, светом в доме и т.д.), которые могут возникнуть во время работы (по этой теме можно писать отдельную большую статью), и рассматриваю только проблемы программные.
Весь код базируется на двух основных библиотеках, а именно StockSharp и Stateless. Соответственно примеры будут рассматриваться в рамках написания компонентов робота под StockSharp.

2. Постановка задачи

Необходимо бесперебойное подключение терминала к серверу брокера, а так же поступление данных от терминала в коннектор и далее в стратегии. Основные требования к нашей системе, обеспечивающей это:

  • Соединение терминала с сервером поддерживается круглые сутки, как в рабочее время биржи, так и в нерабочее.
  • Экспорт данных от терминала в коннектор проверяется только в течение рабочего времени биржи. Клиринг, выходные и ночные часы исключаются из проверок.
  • Процессы, происходящие в системе должны логгироваться.
  • Сбой соединения или экспорта данных должны генерировать СМС уведомление с заданными условиями. Первое СМС отправляется незамедлительно, а последующие через определенный промежуток. СМС шлются до тех пока, пока проблема не исчезнет.
  • Вся система должна быть интегрирована в мой класс TraderBuilder (отвечает за создание объекта Trader и подключение к терминалу, подключение логгеров, проведение необходимых подготовительных работ) и должна начинать работать автоматически при создании объекта TraderBuilder без дополнительных манипуляций.
  • В каждом стандартном коннекторе stocksharp есть в наличии ReConnectionManager, который отвечает за наличие соединения с сервером, а так же за проверку получения данных от терминала. В связи с особенностями работы терминала AlfaDirect, некоторые из функций ReConnectionManager’а не работают или не позволяют реализовать некоторые из пунктов, описанных в требованиях к системе бесперебойной связи. Основные описаны ниже.
  • Переподключение производится только в рабочее время. Следовательно, если сейчас биржа не работает, коннектор не будет переподключен. Время работы биржи задается в настройках, но влияет так же на проверку экспорта данных и я не мог выставить, чтобы переподключение происходило круглосуточно, потому что тогда будет и экспорт проверяться круглые сутки, что приведет к постоянным фиктивным ошибкам. Суть в том, что AlfaDirect каждую ночь в 24:00 отрубает от сервера, а так как время это НЕ рабочее, переподключение не произойдет. Утром приходится перезапускать робота вручную. В то же время терминал Quick не позволяет реализовать круглосуточное подключение, потому что отключает всех в 24:00 и не дает подключаться до утра. Естественно, что трудно в одном коде учесть все возможные виды терминалов и их особенности, и stocksharp ориентирован больше на Quick чем на AlfaDirect в вопросах переподключения.
  • В AlfaTrader, когда соединение с сервером пропадает, генерируется стандартное событие Disconnected. Что сразу же ставит крест на ReConnectionManager, ибо он реагирует только на событие ConnectionError. В итоге не работает на AlfaTrader. И ничего тут сделать нельзя, поскольку это проблема терминала, а не коннектора.
  • Была необходима сигнализация о проблемах подключения по СМС + логгирование процессов переподключения. В базовом ReConnectionManager этого нет. Сигнализацию по СМС настроить можно , а вот логгирование процессов происходящих в ReConnectionManager включить уже нельзя.

В итоге было решено писать свой ReConnectManager, который будет удовлетворять моим требованиям.

3. описание логики системы

Каждый алгоритм, который делает определенные действия в зависимости от того, что происходило ранее, или происходит сейчас, будет иметь несколько внутренних состояний. В зависимости от текущего состояния и приходящих событий, он производит заранее определенные операции. Все алгоритмы можно представить в виде черных ящиков со входом/входами и выходом/выходами и внутренним состоянием. Входов и выходов может быть несколько, тогда подобный алгоритм может быть многопоточным. Если алгоритм однопоточный, все входы/выходы можно представить в виде одного входа/выхода, путем выстраивания данных в одну очередь. Сразу будем рассматривать на нашем примере возможные варианты. Для начала определим, что же будет для нас входом в систему:

  •  Событие получения данных от терминала Trader.NewDataExported. 
  •  Событие подключения терминала Trader.Connected
  •  Событие отключения терминала Trader.Disconnected

Вот собственно и все. Те, кто знаком с библиотекой StockSharp знают, что эти события могут приходить из разных потоков, следовательно, логика нашей системы должна учитывать многопоточность. В крайнем случае, мы используем Lock-и как средство синхронизации потоков. На самом деле мы обязаны синхронизировать потоки, потому что описанная ниже библиотека Stateless совершенно не предназначена для работы в многопоточной среде. Последние два входа в реальной системе я не стал использовать исходя из своего опыта. Я не стал привязывать логику системы к событиям подключения и отключения терминала, а просто по таймеру проверяю состояние подключения/экспорта, и на основе этих данных система решает, что делать дальше. Более понятно будет дальше при чтении комментариев к коду, зачем так было сделано. Выходами нашей системы будут:

  • Переподключение терминала. Trader.Reconnect()
  • Генерация сообщения об ошибке подключения/экспорта.
  • Отладочные сообщения в лог

По факту мы получаем 2 выхода, так как сообщение об ошибке и лог записи есть суть одно и то же. Уже на данном этапе легко понять, что новые данные от терминала могут приходить в момент проверки подключения терминала или в момент проверки экспорта, так как таймеры могут сработать одновременно. Проверка подключения может происходить в момент проверки экспорта данных и так далее. Логика изначально многопоточная и единственный способ все это согласовать, это использовать очереди или lock-и. Я остановился на втором. Итак, определим, вкратце, логику переходов нашей системы и ее состояния.

Торговые роботы и автоматное программирование. Схема1

Торговые роботы и автоматное программирование. Схема1

Стрелки – это переходы от одного состояния к другому. Кольцевые стрелки означают переход состояния в себя само. То есть выполняется некая операция, и состояние не изменяется. В нашем случае операцией является сообщение об ошибке. На диаграмме не хватает одной важной вещи – внешних событий. Выше во время определения входов мы определили для себя что оставляем для себя только событие NewDataExported, а состояние подключения будем проверять самостоятельно по таймеру. Видно, что базовое состояние системы, это ожидание событий внешних, или внутренних. Pause – это устойчивое состояние системы. Остальные же являются транзитными, то есть в них система не может оставаться, а переходит обязательно в какое-то другое состояние. CheckingConnection переходит либо в Reconnecting, либо снова в Pause. Приход новых данных от терминала может произойти в любой момент, следовательно, картинка получится у нас следующая.

торговые роботы и автоматное программирование

Торговые роботы и автоматное программирование. Схема2

Все стало менее понятным и удобочитаемым. Отмечу сразу, что за счет синхронизации потоков и использования таймера, обработка события NewDataExport осуществляется только в состоянии Pause, то есть останется только одна стрелочка. Нахождение системы в транзитных состояниях очень краткосрочно и в итоге система возвращается в состояние Pause, после чего происходит обработка события NewDataExport. Плавно переходим к реализации нашей системы.

4. реализация системы

Старый способ, которым я делал подобные алгоритмы, весьма прост. Я определял список (enum), в котором описывал состояния системы, и далее в коде делал проверки:

if (State == States.CheckingConnection)
{
// делаем что то там
}

 

Данный метод плох тем, что приходилось везде писать разные проверки, следить за тем, чтобы соблюдалась логика переходов из одного состояния в другое, обеспечивать логгирование переходов из одного состояние в другое и проводить кучу разной лишней работы. Значит, если жизнь и облегчалась, то лишь частично при этом добавлялись другие проблемы. В действительности же подобные костыли являются не чем иным как интуитивной попыткой использовать методику автоматного программирования. Самое печальное, что мы снова изобретаем велосипед. Выше уже было рассмотрено, что наша система имеет несколько состояний, и в зависимости от внешних/внутренних воздействий переходит от одного в другое. по факту, это и есть обычный автомат, который мало чем отличается от автомата наливающего газировку за 5 копеек в стакан. Есть целая область науки, занимающаяся автоматным программированием. Давно разработаны методики, а самое главное ПО, реализующее эти методики. Я использую в работе библиотеку Stateless. Это очень простая и в то же время эффективная библиотека. Ее легко освоить и начать использовать. Один маленький минус – Not ThreadSafe, так что приходится заботиться о синхронизации потоков, взаимодействующих с этой библиотекой. Библиотека в свою очередь будет заботиться обо всем остальном, включая проверку согласованности состояний и обеспечение переходов из одного в другое. В дальнейшем это будет видно.

У нашей системы будет несколько входных параметров:

  • Период проверки состояния подключения.
  • Период проверки экспорта данных.
  • Таймаут отсутствия подключения, после которого генерируется сообщение об ошибке.
  • Таймаут после первого сообщения об ошибке, который выдерживается перед генерацией повторного и последующих сообщений.
  • Рабочее время, в течение которого будет проверяться экспорт данных.
  • Все эти настройки реализуем в виде отдельного класса, который будет определять базовые значения параметров.
using System;
using StockSharp.BusinessEntities;

namespace ra81.Trading.TraderBuilders
{
internal class ReConnectionSettings
{
/// <summary>
/// Интервал проверки подключения. С каким интервалом проверять состояние подключения к серверу.
/// После старта менеджера, изменение свойства не оказывает никакого влияния.
/// </summary>
public TimeSpan ConnectionCheckInterval { get; set; }

/// <summary>
/// Время отсутствия подключения, после которого генерируется сообщение об ошибке.
/// </summary>
public TimeSpan ConnectionTimeout { get; set; }

/// <summary>
/// Интервал проверки получения новых данных. Если в течение него не было новых данных, генерируется сообщение об Error.
/// После старта менеджера изменение свойства не оказывает никакого влияния.
/// </summary>
public TimeSpan DataExportCheckInterval { get; set; }

/// <summary>
/// Интервал с которым слать повторные сообщения. Если было первое сообщение, то второе будет не раньше чем через сей интервал.
/// </summary>
public TimeSpan MessageResendInterval { get; set; }

/// <summary>
/// Рабочее время биржи. Проверка экспорта данных будет производиться только в это время. 
/// </summary>
public WorkingTime WorkingTime { get; set; }

public ReConnectionSettings()
{
ConnectionCheckInterval = TimeSpan.FromSeconds(10);
ConnectionTimeout = TimeSpan.FromSeconds(60);
DataExportCheckInterval = TimeSpan.FromSeconds(20);
MessageResendInterval = TimeSpan.FromMinutes(5);
WorkingTime = new WorkingTime();
}
}
}

 

Создаем болванку нашего класса ReConnectionManager

 

using System;
using System.Threading;
using Ecng.Collections;
using Stateless;
using StockSharp.Algo;
using StockSharp.Algo.Logging;
using StockSharp.BusinessEntities;
using Ecng.Common;
using Ecng.ComponentModel;

namespace ra81.Trading.TraderBuilders
{
/// <summary>
/// Менеджер переподключения, следит за тем, чтобы трейдер всегда был подключен к бирже. Если терминал отключается
/// менеджер производит автоматическое переподключение терминала. Если данные от терминала не приходят, менеджер сигнализирует об этом
/// генерируя ошибку. Ошибку можно послать через СМС. Так же если нет коннекта заданное время, будет сгенерирована ошибка.
/// Если негативная ситуация сохраняется, менеджер сгенерирует ошибку повторно через определенный интервал и так до момента,
/// когда негативная ситуация исправится. 
/// При проверка экспорта данных учитывается время работы биржи, заданное в настройках. При проверке коннекта - не учитывается. 
/// Подразумевается, что терминал должен быть подключен к бирже всегда.
/// 
/// Работать менеджер начинает сразу после первого подключения трейдера к бирже. Потом он не останавливается.
/// </summary>
internal class ReConnectionManager : ra81.Common.Disposable, ILogReceiver
{
#region Private fields

private readonly ITrader _trader;

#endregion

#region Public properties

public ReConnectionSettings Settings { get; private set; }

#endregion

public ReConnectionManager(ITrader trader)
{
Settings = new ReConnectionSettings();

_trader = trader;
}

#region ILogReceiver

private readonly Guid _id = Guid.NewGuid();

Guid ILogSource.Id
{
get { return _id; }
}

string ILogSource.Name
{
get { return GetType().Name; }
}

ILogSource ILogSource.Parent
{
get { return null; }
}

INotifyList<ILogSource> ILogSource.Childs
{
get { return null; }
}

private Action<LogMessage> _log;

event Action<LogMessage> ILogSource.Log
{
add { _log += value; }
remove { _log -= value; }
}

void ILogReceiver.AddLog(LogMessage message)
{
// Подменяем сообщение, чтобы время в нем было временем трейдера а не локальным.
var newMsg = new LogMessage(message.Source, _trader.MarketTime, message.Type, message.Message);
_log.SafeInvoke(newMsg);
}

#endregion

 

Я сразу же определил, что класс наследует интерфейс ILogReceiver, чтобы была возможность использовать стандартные возможности логгирования в stocksharp.
Далее определяем состояния системы в виде множества и создаем свойство, содержащее это состояние.

enum States
{
Init,
Starting,
Pause,
CheckingConnection,
Reconnecting,
CheckingDataExport,
}

private States _state;

#region Private Properties

private States State
{
get { return _state; }
set
{
if (_state == value)
return;

this.AddInfoLog(_state + " -> " + value);
_state = value;
}
}

#endregion

 

Тут сразу можно обратить внимание, что при переходе из состояние в состояние мы будем получать лог запись. Это полезно. Далее необходимо настроить наш автомат на базе Stateless. Состояния системы уже определены, осталось определить триггеры, которые будут переводить систему из одного состояния в другое. Stateless использует триггеры для обеспечения переходов, так что описываем их.

enum Triggers
{
Start,
Wait,
CheckConnection,
ConnectionError,
CheckDataExport,
NewDataExported,

MessageRequest,
}

 

Теперь пришло время определить логику всей системы в терминах Stateless. Это у меня сделано в виде простой процедуры, код которой ниже.

private StateMachine<States, Triggers> _stateMachine;

private void InitStateMachine()
{
// Создаем объект. Передаем две лямбды. Одна на получение состояния, другая на изменение. Это позволяет добавить свою логику при чтении и модификации состояний. Мы добавили логгирование при модификации.
_stateMachine = new StateMachine<States, Triggers>(() => State, (state) => State = state);

// Определяем базовое состояние системы. В этом состоянии допустим только переход в состояние запуска. Для этого используется триггер Start. Permit – говорит о том что подобный переход разрешен для этого состояния.
_stateMachine.Configure(States.Init)
.Permit(Triggers.Start, States.Starting);

// После инициализации сразу переходим в ожидание событий от таймеров. OnEntry – говорит какой метод запустить в момент перехода в это состояние. То есть когда мы из Init будем переходить в Starting сработает метод StartingOnEntry
_stateMachine.Configure(States.Starting)
.OnEntry(StartingOnEntry)
.Permit(Triggers.Wait, States.Pause);

// Ждем срабатывания таймера. PermitReentry – говорит о том что триггер NewDataExported будет переводить систему в то же самое состояние в котором она была до вызова триггера. OnEntryFrom – позволяет связать конкретный триггер и метод. Если мы переходим в Pause от триггера NewDataExported, будет вызван метод OnEntryFromNewDataExported
_stateMachine.Configure(States.Pause)
.Permit(Triggers.CheckConnection, States.CheckingConnection)
.Permit(Triggers.CheckDataExport, States.CheckingDataExport)
.PermitReentry(Triggers.NewDataExported).OnEntryFrom(Triggers.NewDataExported,
OnEntryFromNewDataExported);

// Проверка состояния подключения к серверу. 
_stateMachine.Configure(States.CheckingConnection)
.OnEntryFrom(Triggers.CheckConnection, OnEntryFromCheckConnection)
.Permit(Triggers.Wait, States.Pause)
.Permit(Triggers.ConnectionError, States.Reconnecting)
.PermitReentry(Triggers.NewDataExported).OnEntryFrom(Triggers.NewDataExported,
OnEntryFromNewDataExported);
// Переподключение терминала.
_stateMachine.Configure(States.Reconnecting)
.OnEntryFrom(Triggers.ConnectionError, ReconnectingOnEntryFromConnectionError)
.Permit(Triggers.Wait, States.Pause)
.PermitReentry(Triggers.NewDataExported).OnEntryFrom(Triggers.NewDataExported,
OnEntryFromNewDataExported)
.PermitReentry(Triggers.MessageRequest).OnEntryFrom(Triggers.MessageRequest,
ReconnectingOnEntryFromMessageRequest);

// Проверка экспорта данных.
_stateMachine.Configure(States.CheckingDataExport)
.OnEntryFrom(Triggers.CheckDataExport, OnEntryFromCheckDataExport)
.Permit(Triggers.Wait, States.Pause)
.PermitReentry(Triggers.NewDataExported).OnEntryFrom(Triggers.NewDataExported,
OnEntryFromNewDataExported)
.PermitReentry(Triggers.MessageRequest).OnEntryFrom(Triggers.MessageRequest,
CheckingDataExportOnEntryFromMessageRequest);
}

Итак, дополним класс этим методом, а конструктор изменим.
public ReConnectionManager(ITrader trader)
{
Settings = new ReConnectionSettings();

_trader = trader;
InitStateMachine();
}

 

Теперь уже считай все готово. Логика определена, переходы описаны. Нужно реализовать методы в переходах, и указать, как же будет запускаться вся система . Ниже собственно весь код. Комментарии смотрим по тексту, они будут пояснять нюансы решений и логику.

using System;
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Ecng.Collections;
using Stateless;
using StockSharp.Algo;
using StockSharp.Algo.Logging;
using StockSharp.BusinessEntities;
using ra81.Common;
using Ecng.Common;

namespace ra81.Trading.TraderBuilders
{
/// <summary>
/// Менеджер переподключения, следит за тем, чтобы трейдер всегда был подключен к бирже. Если терминал отключается
/// менеджер производит автоматическое переподключение терминала. Если данные от терминала не приходят, менеджер сигнализирует об этом
/// генерируя ошибку. Ошибку можно послать через СМС. Так же если нет коннекта заданное время, будет сгенерирована ошибка.
/// Если негативная ситуация сохраняется, менеджер сгенерирует ошибку повторно через определенный интервал и так до момента,
/// когда негативная ситуация исправится. 
/// При проверка экспорта данных учитывается время работы биржи, заданное в настройках. При проверке коннекта - не учитывается. 
/// Подразумевается, что терминал должен быть подключен к бирже всегда.
/// 
/// Работать менеджер начинает сразу после первого подключения трейдера к бирже. Потом он не останавливается.
/// </summary>
internal class ReConnectionManager : ra81.Common.Disposable, ILogReceiver
{

enum States
{
Init,
Starting,
Pause,
CheckingConnection,
Reconnecting,
CheckingDataExport,
}

enum Triggers
{
Start,
Wait,
CheckConnection,
ConnectionError,
CheckDataExport,
NewDataExported,

MessageRequest,
}

#region Private fields

private readonly ITrader _trader;
private States _state;
private StateMachine<States, Triggers> _stateMachine;

/// <summary>
/// Таймер проверки подключения. 
/// </summary>
private Timer _connectionCheckTimer;

/// <summary>
/// Таймер проверки экспорта данных от терминала.
/// </summary>
private Timer _dataExportCheckTimer;

/// <summary>
/// Таймер проверки получения новых данных от терминала.
/// </summary>
private Timer _newDataCheckTimer;

/// <summary>
/// Время когда терминал был последний раз подключен к серверу. 
/// Используется для замера времени без коннекта, чтобы отослать СМС.
/// </summary>
private DateTime _lastConnected;

/// <summary>
/// Дата последнего сообщения об ошибке подключения.
/// </summary>
private DateTime _lastConnectionMessage;

/// <summary>
/// Время получения последних данных. Используется для замера времени, чтобы отсылать СМС если данных долго нет.
/// </summary>
private DateTime _lastDataExported;

/// <summary>
/// Время последнего сообщения об ошибке экспорта.
/// </summary>
private DateTime _lastExportMessage;

/// <summary>
/// Флаг получения новых данных. Когда данные приходят, ставится в 1. После обработки скидывается на 0.
/// </summary>
private int _dataReceivedFlag;

private readonly object _syncObj = new object();

#endregion

#region Private Properties

private States State
{
get { return _state; }
set
{
if (_state == value)
return;

this.AddInfoLog(_state + " -> " + value);
_state = value;
}
}

#endregion

#region Public properties

public ReConnectionSettings Settings { get; private set; }

#endregion

public ReConnectionManager(ITrader trader)
{
Settings = new ReConnectionSettings();

_trader = trader;
InitStateMachine();

// При первом подключении терминала к серверу, заводим автомат.
_trader.Connected += StartStateMachine;

}

private void InitStateMachine()
{
_stateMachine = new StateMachine<States, Triggers>(() => State, (state) => State = state);

_stateMachine.Configure(States.Init)
.Permit(Triggers.Start, States.Starting);

// После инициализации сразу переходим в ожидание событий от таймеров.
_stateMachine.Configure(States.Starting)
.OnEntry(StartingOnEntry)
.Permit(Triggers.Wait, States.Pause);

// Ждем срабатывания таймера
_stateMachine.Configure(States.Pause)
.Permit(Triggers.CheckConnection, States.CheckingConnection)
.Permit(Triggers.CheckDataExport, States.CheckingDataExport)
.PermitReentry(Triggers.NewDataExported).OnEntryFrom(Triggers.NewDataExported,
OnEntryFromNewDataExported);

// Проверка состояния подключения к серверу. 
_stateMachine.Configure(States.CheckingConnection)
.OnEntryFrom(Triggers.CheckConnection, OnEntryFromCheckConnection)
.Permit(Triggers.Wait, States.Pause)
.Permit(Triggers.ConnectionError, States.Reconnecting)
.PermitReentry(Triggers.NewDataExported).OnEntryFrom(Triggers.NewDataExported,
OnEntryFromNewDataExported);
// Переподключение терминала.
_stateMachine.Configure(States.Reconnecting)
.OnEntryFrom(Triggers.ConnectionError, ReconnectingOnEntryFromConnectionError)
.Permit(Triggers.Wait, States.Pause)
.PermitReentry(Triggers.NewDataExported).OnEntryFrom(Triggers.NewDataExported,
OnEntryFromNewDataExported)
.PermitReentry(Triggers.MessageRequest).OnEntryFrom(Triggers.MessageRequest,
ReconnectingOnEntryFromMessageRequest);

// Проверка экспорта данных.
_stateMachine.Configure(States.CheckingDataExport)
.OnEntryFrom(Triggers.CheckDataExport, OnEntryFromCheckDataExport)
.Permit(Triggers.Wait, States.Pause)
.PermitReentry(Triggers.NewDataExported).OnEntryFrom(Triggers.NewDataExported,
OnEntryFromNewDataExported)
.PermitReentry(Triggers.MessageRequest).OnEntryFrom(Triggers.MessageRequest,
CheckingDataExportOnEntryFromMessageRequest);
}

private void StartStateMachine()
{
// Отписываемся от события, теперь автомат сам будет все контроллить.
_trader.Connected -= StartStateMachine;

// Вот так собственно и вызывается триггер, переводящий систему из одного состояния в другое.
_stateMachine.Fire(Triggers.Start);
}

private void StartingOnEntry()
{
// Заводим ожидание таймеров. Так же подключаем получение экспорта. Дабы следить за ним.
// Это нужно сделать ДО строк ниже. Не разрешено получать данные в режиме Starting
_stateMachine.Fire(Triggers.Wait);

// создадим таймеры и запустим их.
// Используем Локи чтобы не допустить одновременного срабатывания. Машина однопоточная.
_lastConnected = DateTime.Now;
// машина может стартовать тока после подключения терминала. Обновляем переменную.
_lastConnectionMessage = DateTime.MinValue;
_connectionCheckTimer = ThreadingHelper.Timer(() =>
{
lock (_syncObj)
{
_stateMachine.Fire(Triggers.CheckConnection);
}
}).Interval(Settings.ConnectionCheckInterval);

_lastDataExported = DateTime.Now;
_lastExportMessage = DateTime.MinValue;
_dataExportCheckTimer = ThreadingHelper.Timer(() =>
{
lock (_syncObj)
{

_stateMachine.Fire(Triggers.CheckDataExport);
}
}).Interval(Settings.DataExportCheckInterval);

// Таймер проверят приходили ли новые данные за время между проверками. Если да, то генерится триггер.
// Сделано так, чтобы не подвешивать событие NewDataExported. Вызвать триггера надо в один поток, 
// значит ставить синхронизацию. А сколько там будет отрабатывать другой триггер не известно. Это повешает весь
// трейдер.
_dataReceivedFlag = 0;
_newDataCheckTimer = ThreadingHelper.Timer(() =>
{
if (_dataReceivedFlag != 1) return;

Interlocked.Exchange(ref _dataReceivedFlag, 0);

lock (_syncObj)
{
_stateMachine.Fire(Triggers.NewDataExported);
}
}).Interval(TimeSpan.FromSeconds(1));

// По приходу новых данных мы будем просто выставлять флаг что они приходили. Это минимизирует задержки трейдера.
_trader.NewDataExported += () => Interlocked.Exchange(ref _dataReceivedFlag, 1);
}

private void OnEntryFromCheckConnection()
{
// Проверяем состояние подключения.
if (_trader.IsConnected)
{
_lastConnected = DateTime.Now;
_stateMachine.Fire(Triggers.Wait);
}
else
_stateMachine.Fire(Triggers.ConnectionError);
}

private void ReconnectingOnEntryFromConnectionError()
{
// Генерим сообщение об ошибке подключения.
_stateMachine.Fire(Triggers.MessageRequest);

// Пробуем переподключиться. 
_trader.Reconnect();
_stateMachine.Fire(Triggers.Wait);
}

private void ReconnectingOnEntryFromMessageRequest()
{
// Если время когда мы были подключены последний раз, было давно, шлем смс об этом.
// Но чтобы не слать СМС все время, проверяем время последней отсылки СМС и повторную смс шлем не раньше
// чем установленный таймаут экскалации.
var lastTime = _lastConnected;
var delta = DateTime.Now - lastTime;
var messageDelta = DateTime.Now - _lastConnectionMessage;

if (delta < Settings.ConnectionTimeout) return;

// Если посылали Ошибку давно, то генерим Error, иначе просто информационное сообщение.
if (messageDelta >= Settings.MessageResendInterval)
{
// Шлем сообщение о том что все плохо. И обновляем время последнего сообщения.
this.AddErrorLog("Подключения к серверу нет в течение {0}.".Put(delta));
_lastConnectionMessage = DateTime.Now;
}
else
{
this.AddWarningLog("Подключения к серверу нет в течение {0}.".Put(delta));
}
}

private void OnEntryFromCheckDataExport()
{
_stateMachine.Fire(Triggers.MessageRequest);

_stateMachine.Fire(Triggers.Wait);
}

private void OnEntryFromNewDataExported()
{
// Были получены новые данные от терминала. Обновим время последнего получения данных.
this.AddInfoLog("OnEntryFromNewDataExported(): New data exported.");
_lastDataExported = DateTime.Now;
}

private void CheckingDataExportOnEntryFromMessageRequest()
{
// Проверяем время пошедшее с момента последних данных полученных от терминала.
// Если задержка экспорта больше чем таймаут проверки то генерим ошибку экспорта.
var lastTime = _lastDataExported;
var delta = DateTime.Now - lastTime;
var messageDelta = DateTime.Now - _lastExportMessage;

if (delta < Settings.DataExportCheckInterval) return;

// Проверяем торгуемое ли время щас вообще, клиринг и прочая байда. Не ждем данных в это время.
if (!Settings.WorkingTime.IsTradeTime(_trader))
{
this.AddInfoLog("Торговли нет. Пропускаю проверку экспорта.");
// Обновляем время последнего экспорта данных. 
// Если после старта сессии данные не будут приходить мы быстро узнаем, потому что таймаут проверки мал.
// Если не обновить тут время, после старта сессии можем схлопотать Error потому что проверка пройдет быстрее чем придут данные.
_lastDataExported = DateTime.Now;

// Не обновляем время последнего сообщения. В случае если после клиринга данные не приходят, мы оперативно получим мессадж.
// Если обновить время сообщения то получим только через таймаут повторной посылки.

return;
}

// Если посылали Ошибку давно, то генерим Error, иначе просто информационное сообщение.
if (messageDelta >= Settings.MessageResendInterval)
{
// Послать сообщение при ошибке получения данных от терминала.
this.AddErrorLog("Данные не приходили от терминала в течение {0}.".Put(delta));
_lastExportMessage = DateTime.Now;
}
else
{
this.AddWarningLog("Данные не приходили от терминала в течение {0}.".Put(delta));
}
}

#region Disposable

protected override void DisposeManaged()
{
if (_connectionCheckTimer != null)
_connectionCheckTimer.Dispose();

if (_dataExportCheckTimer != null)
_dataExportCheckTimer.Dispose();

if (_newDataCheckTimer != null)
_newDataCheckTimer.Dispose();

base.DisposeManaged();
}

#endregion

#region ILogReceiver

private readonly Guid _id = Guid.NewGuid();

Guid ILogSource.Id
{
get { return _id; }
}

string ILogSource.Name
{
get { return GetType().Name; }
}

ILogSource ILogSource.Parent
{
get { return null; }
}

INotifyList<ILogSource> ILogSource.Childs
{
get { return null; }
}

private Action<LogMessage> _log;

event Action<LogMessage> ILogSource.Log
{
add { _log += value; }
remove { _log -= value; }
}

void ILogReceiver.AddLog(LogMessage message)
{
// Подменяем сообщение, чтобы время в нем было временем трейдера а не локальным.
var newMsg = new LogMessage(message.Source, _trader.MarketTime, message.Type, message.Message);
_log.SafeInvoke(newMsg);
}

#endregion
}
}

 

Класс готов. Он реализует все наши требования кроме одного – рассылка СМС. Но это легко реализуется через стандартные методы stocksharp. Для этого будем использовать SmsLogListener с фильтрацией по Error сообщениям. Код ниже показывает пример запуска терминала, подключения ReConnectionManager и подключения СМС рассылки. По факту это метод, который помещает Trader в private поле объекта TraderBuilder.

// Метод создающий трейдера, реконнект менеджера и подключающей их к лог менеджеру
private void CreateTrader()
{
// Если же трейдер еще не было создан ранее, создаем тогда его с нуля.
this.AddInfoLog("CreateTrader(): Создаем шлюз взаимодействия с системой AlfaDirect.");
var trader = new AlfaTrader()
{
Login = Login,
Password = Password,
};

_logManager.Sources.Add(trader);

this.AddInfoLog("CreateTrader(): Задаем специальные рабочие и выходные дни бирж.");
// Все специальные дни – свойства объекта TraderBuilder и были заданы при его создании.
Exchange.Micex.WorkingTime.SpecialHolidays = SpecialHoliday;
Exchange.Micex.WorkingTime.SpecialWorkingDays = SpecialWorkingDay;
Exchange.Rts.WorkingTime.SpecialHolidays = SpecialHoliday;
Exchange.Rts.WorkingTime.SpecialWorkingDays = SpecialWorkingDay;

this.AddInfoLog("CreateTrader(): Производим базовые настройки шлюза.");
// здесь мы тупо отключаем встроенный ReConnectionManager дабы он не мешал нашей системе переподключения.
trader.ReConnectionSettings.Interval = TimeSpan.FromSeconds(10);
trader.ReConnectionSettings.WorkingTime = Exchange.Rts.WorkingTime;
trader.ReConnectionSettings.ConnectingAttemptCount = 0;
trader.ReConnectionSettings.ReConnectingAttemptCount = 0;
trader.ReConnectionSettings.ExportTimeOutInterval = TimeSpan.FromSeconds(10);
trader.ReConnectionSettings.IsReStartExport = false;

trader.Connected += OnTraderConnected;
trader.ConnectionError += OnTraderConnectionError;
trader.Disconnected += OnTraderDisconnected;

this.AddInfoLog("CreateTrader(): Создаем и подключаем ReConnectionManager.");
_reConnectionManager = new ReConnectionManager(trader)
{
Settings =
{
ConnectionCheckInterval = this.ConnectionCheckInterval,
DataExportCheckInterval = this.DataExportCheckInterval,
ConnectionTimeout = this.ConnectionTimeout,
MessageResendInterval = this.MessageResendInterval,
WorkingTime = Exchange.Rts.WorkingTime
}
};

_logManager.Sources.Add(_reConnectionManager);

this.AddInfoLog("CreateTrader(): Производим подключение.");
trader.Connect();

_trader = trader;
}

// инициализация системы логгирования. Здесь создаются все системы отвечающие за логи и СМС рассылку.
private void CreateLogger()
{
// собираем имя лог файла
var logFile = "AlfaTraderBuilder";
if (!Title.IsEmpty())
{
logFile = Title;
}

// собираем имя директории. по дефолту LogDir равен null что недопустимо.
var logDir = "Log";
if (!LogDir.IsEmpty())
{
logDir = LogDir;
}

// Создаем логгера файлового. Сюда будет падать вся отладочная.
_logManager = new LogManager();
var fileListener = new FileLogListener(logFile)
{
Append = true,
Extension = ".log",
LogDirectory = logDir,
SeparateByDates = SeparateByDateModes.FileName
};
_logManager.Listeners.Add(fileListener);

// Создаем СМС логгера через гугл календарь. Будем слать только Error, включив фильтрацию. Вот как раз ТУТ будут отсылаться СМС уведомления в случае ошибок в реконнект менеджере. Для этого он генерирует Error записи в лог.
var smsListener = new SmsLogListener(GoogleSmsLogin, GoogleSmsPassword);
smsListener.Filters.Add(LogListener.AllErrorFilter);
_logManager.Listeners.Add(smsListener);

// Сам билдер тоже является источником логов. Его добавляем в логманагер.
_logManager.Sources.Add(this);
}

 

5. доработка системы с помощью EventDispatcher

Собственно здесь можно бы и закончить. НО есть одна вещь, которая жутко мне не нравилась, а конкретно реализация получения данных по событию NewDataExported и использование lock-ов в методе StartingOnEntry. Я это решил исправить и использовать один замечательный класс входящий в набор stocksharp из библиотеки Ecng.ComponentModel под названием EventDispatcher. Данный класс позволяет реализовать очереди из операций. Можно создать несколько очередей и помещать операции в нужную очередь, таким образом можно реализовать синхронизацию потоков без использования блокировок. В моем примере мне нужно чтобы все триггеры вызывались только из одного потока и только последовательно. Это легко реализуется через EventDispatcher в котором один поток событий. По факту нужно изменить код в нескольких местах чтобы все заработало по новому. Итак:

 

private EventDispatcher _dispatcher;
private string _threadToken = "token";

public ReConnectionManager(ITrader trader)

{
Settings = new ReConnectionSettings();
// Диспатчер будем использовать чтобы все события происходили в одном потоке. Автомат не может работать в многопоточной среде.
_dispatcher = new EventDispatcher((ex) => this.AddErrorLog("Ошибка при выполнении в EventDispatcher. " + ex));

_trader = trader;
InitStateMachine();

// При первом подключении терминала к серверу, заводим автомат.
_trader.Connected += StartStateMachine;

}

private void StartingOnEntry()
{
// Заводим ожидание таймеров. Так же подключаем получение экспорта. Дабы следить за ним.
// Это нужно сделать ДО строк ниже. Не разрешено получать данные в режиме Starting
_stateMachine.Fire(Triggers.Wait);

// создадим таймеры и запустим их.
// Используем диспатчер чтобы не допустить одновременного срабатывания. Машина однопоточная.
_lastConnected = DateTime.Now;

// машина может стартовать тока после подключения терминала. Обновляем переменную.
_lastConnectionMessage = DateTime.MinValue;
_connectionCheckTimer = ThreadingHelper.Timer(() =>
{
_dispatcher.Add(() => _stateMachine.Fire(Triggers.CheckConnection), _threadToken);
}).Interval(Settings.ConnectionCheckInterval);

_lastDataExported = DateTime.Now;
_lastExportMessage = DateTime.MinValue;
_dataExportCheckTimer = ThreadingHelper.Timer(() =>
{
_dispatcher.Add(() => _stateMachine.Fire(Triggers.CheckDataExport), _threadToken);
}).Interval(Settings.DataExportCheckInterval);

// По приходу новых данных мы будем просто добавлять вызов триггера в диспатчер. Это решает проблему многопоточности.
_trader.NewDataExported += () =>
{
_dispatcher.Add(() => _stateMachine.Fire(Triggers.NewDataExported), _threadToken);
};
}

 

Вот так легко мы избавились от хитрых методов получения события NewDataExported, решили проблему многопоточности и убрали lock-и. Теперь все триггеры всегда вызываются из одного потока. Если вдруг логика наша оказалась ошибочной, и существуют неразрешенные переходы из состояния в состояние, StateMachine сообщит нам об этом путем генерации исключения. Теперь все логические изъяны при проектировании сразу же вылезут наружу (что всегда и происходит очень быстро). С момента как я начал использовать эту библиотеку, стал гораздо меньше времени тратить на отлов неуловимых багов, а занимаюсь более полезными делами.
Итоговый код ReConnectionManager можно взять в приложении. Он немного отличается в деталях. Подчищен код и меньше комментариев.

Обучение программированию торговых роботов на StockSharp

 


comments powered by HyperComments

Саня
2017-01-02 03:16:32
Казино Вулкан раздают деньги сегодня http://cenforce100.ru/casino-vulkan.php
14
Июл
2017

Доверительное управление. Результаты в июне 2017 года.

Доверительное управление. Результаты в июне 2017 года. Июнь индекс РТС вновь провел преимущественно в боковых движениях, а… »

11
Июн
2017

Доверительное управление. Результаты в мае 2017 года.

Доверительное управление. Результаты в мае 2017 года. В мае “болтанка” индекса РТС продолжилась, на паре… »

7
Май
2017

Доверительное управление. Результаты в апреле 2017 года.

Доверительное управление. Результаты в апреле 2017 года. В апреле мы наблюдали очередной месяц “боковика” по… »

2
Апр
2017

Доверительное управление. Результаты в марте 2017 года.

Доверительное управление. Результаты в марте 2017 года. В марте волатильность на рынке несущественно выросла. Все… »

7
Мар
2017

Доверительное управление. Результаты в феврале 2017 года.

Доверительное управление. Результаты в феврале 2017 года. Февраль был самым коротким торговым месяцем, к тому же… »