#region Copyright (c) B2BITS Corp.
//
//
// $Revision: 1.3 $
//
// (c) B2BITS EPAM Systems Company 2000-2010.
// "Licensor" shall mean B2BITS EPAM Systems Company (B2BITS).
//
// This software is for the use of the paying client of B2BITS (which may be
// a corporation, business area, business unit or single user) to whom it was
// delivered (the "Licensee"). The use of this software is subject to
// license terms.
//
// The Licensee acknowledges and agrees that the Software and Documentation
// (the "Confidential Information") is confidential and proprietary to
// the Licensor and the Licensee hereby agrees to use the Confidential
// Information only as permitted by the full licence agreement between
// the two parties, to maintain the confidentiality of the Confidential
// Information and not to disclose the confidential information, or any part
// thereof, to any other person, firm or corporation. The Licensee
// acknowledges that disclosure of the Confidential Information may give rise
// to an irreparable injury to the Licensor in-adequately compensable in
// damages. Accordingly the Licensor may seek (without the posting of any
// bond or other security) injunctive relief against the breach of the forgoing
// undertaking of confidentiality and non-disclosure, in addition to any other
// legal remedies which may be available, and the licensee consents to the
// obtaining of such injunctive relief. All of the undertakings and
// obligations relating to confidentiality and non-disclosure, whether
// contained in this section or elsewhere in this agreement, shall survive
// the termination or expiration of this agreement for a period of five (5)
// years.
//
// The Licensor agrees that any information or data received from the Licensee
// in connection with the performance of the support agreement relating to this
// software shall be confidential, will be used only in connection with the
// performance of the Licensor's obligations hereunder, and will not be
// disclosed to third parties, including contractors, without the Licensor's
// express permission in writing.
//
// Information regarding the software may be provided to the Licensee's outside
// auditors and attorneys only to the extent required by their respective
// functions.
//
//
#endregion
using System;
using System.Diagnostics;
using System.Text;
using com.b2bits.FIXAntenna;
using System.Threading;
namespace SimpleBenchmark
{
class Program
{
Program(bool isPersist)
{
isPersistent = isPersist;
}
Stopwatch sendWatch = new Stopwatch(); //adds time spend for sending each message
Stopwatch receiveWatch = new Stopwatch(); //calculates time spend for the whole test
AutoResetEvent[] autoEvents; //used to syncronize session states
SessionState expectedState = SessionState.NON_GRACEFULLY_TERMINATED;
int expectedMessages = 0;
int receivedMessages = 0;
ManualResetEvent messageReceivedEvent = new ManualResetEvent(false);
ManualResetEvent allMessageReceivedEvent = new ManualResetEvent(false);
const string sender = "SND";
const string target = "TRG";
const int LatencyMessageCount = 100000;
const int ThroughputMessageCount = 100000;
int MsgNum = 0;
long[] SendLatency = new long[LatencyMessageCount];
long[] ReceiveLatency = new long[LatencyMessageCount];
Session sessionInitiator = null;
Session sessionAcceptor = null;
FixMessage msg;
bool isPersistent = false;
private void Connect( bool isLatencyTest )
{
FixEngine engine = FixEngine.Instance;
Console.WriteLine("Creating 2 " + (isPersistent ? "persistent" : "transient") + " FIX sessions...");
SessionExtraParameters sessionParams = new SessionExtraParameters();
if (isLatencyTest)
{
sessionParams.DisableTCPBuffer = true; // disable buffering of data in TCP/IP stack
sessionParams.AgressiveSend = true; // enable faster message delivery to wire
sessionParams.EnableWireLevelEvents = true; // low-level socket events (BeforeSent, AfterReceived) to watch for,
// affects throughput due to the high rate of events
}
autoEvents = new AutoResetEvent[] { new AutoResetEvent(false), new AutoResetEvent(false) };
//create 2 sessions and subscribe to events
MessageStorageType strg = isPersistent ? MessageStorageType.Persistent : MessageStorageType.Transient;
sessionInitiator = engine.CreateSession(sender, target, FixVersion.FIX42, sessionParams, strg);
sessionInitiator.NewStateEvent += new Session.NewStateEventHandler(sessionInitiator_NewStateEvent);
sessionAcceptor = engine.CreateSession(target, sender, FixVersion.FIX42, sessionParams, strg);
sessionAcceptor.NewStateEvent += new Session.NewStateEventHandler(sessionAcceptor_NewStateEvent);
//connect sessions - it is an asyncroneous operation...
sessionAcceptor.ConnectAsAcceptor();
sessionInitiator.ConnectAsInitiator("localhost", engine.ListenPort, 3000);
//so we need to wait while connection will be established
ResetExpectedState(SessionState.ESTABLISHED);
WaitHandle.WaitAll(autoEvents);
Console.WriteLine("Preparing test message:");
msg = FixMessage.Parse("8=FIX.4.2"+"\x1"+"9=1"+"\x1"+"35=D"+"\x1"+"49=BLP"+"\x1"+"56=SCHB"+"\x1"+"34=01"+"\x1"+"50=30737"+"\x1"+"97=Y"+"\x1"+"52=20000809-20:20:50"+"\x1"+"11=90001008"+"\x1"+"1=10030003"+"\x1"+"21=2"+"\x1"+"55=TESTA"+"\x1"+"54=1"+"\x1"+"38=4000"+"\x1"+"40=2"+"\x1"+"59=0"+"\x1"+"44=30"+"\x1"+"47=I"+"\x1"+"60=20000809-18:20:32"+"\x1"+"10=000"+"\x1");
Console.WriteLine(msg.ToString('|'));
Console.WriteLine();
}
void closeSessions()
{
sessionAcceptor.DisconnectSync(true);
sessionAcceptor.Dispose();
sessionInitiator.DisconnectSync(true);
sessionInitiator.Dispose();
}
///
/// Executes latency test for count messages
///
///
private void LatencyTest(int count)
{
if (!Stopwatch.IsHighResolution)
{
Console.WriteLine("The .NET's Stopwatch object does not use high resolution timer in your system. The Latency test is not possible to do");
return;
}
Console.WriteLine("Latency test for {0} messages", count);
sessionAcceptor.IncomingMessageEvent += new Session.IncomingMessageEventHandler(sessionAcceptorLatency_IncomingMessageEvent);
sessionAcceptor.AfterMessageIsReceivedEvent += new Session.AfterMessageIsReceivedEventHandler(sessionAcceptorLatency_OnAfterMessageIsReceived);
sessionInitiator.BeforeMessageIsSentEvent += new Session.BeforeMessageIsSentEventHandler(sessionInitiatorLatency_OnBeforeMessageIsSent);
sendWatch.Reset();
receiveWatch.Reset();
Console.WriteLine("Sending messages... ");
MsgNum = 0;
// Send 1 message for warm-up
sessionInitiator.Put(msg);
messageReceivedEvent.WaitOne();
MsgNum = 0;
for( int i = 0; i < count; ++i )
{
messageReceivedEvent.Reset();
sendWatch.Reset();
sendWatch.Start();
sessionInitiator.Put(msg);
messageReceivedEvent.WaitOne();
//if (0 == (i % 25000)) Console.WriteLine("{0} messages were sent", i);
}
//unsubscribe from this event, because for another test we will need another event
sessionAcceptor.IncomingMessageEvent -= new Session.IncomingMessageEventHandler(sessionAcceptorLatency_IncomingMessageEvent);
double sendMin = 999999, sendMax = 0, sendSum = 0, sendAvg;
double recvMin = 999999, recvMax = 0, recvSum = 0, recvAvg;
String filename = "latency.xls";
System.IO.TextWriter file = new System.IO.StreamWriter(filename);
file.WriteLine( "Send\tRecv" );
for (int i = 0; i < count; i++)
{
double send = SendLatency[i];
send /= (Stopwatch.Frequency / 1000000.0);
if (send < sendMin)
sendMin = send;
if (send > sendMax)
sendMax = send;
sendSum += send;
double recv = ReceiveLatency[i];
recv /= (Stopwatch.Frequency / 1000000.0);
if (recv < recvMin)
recvMin = recv;
if (recv > recvMax)
recvMax = recv;
recvSum += recv;
file.WriteLine( Math.Round(send, 0) + "\t" + Math.Round(recv, 0) );
}
file.Close();
sendAvg = sendSum / count;
recvAvg = recvSum / count;
Console.WriteLine("Send latency (mcs): ");
Console.WriteLine("\tMIN: {0}", Math.Round(sendMin, 0));
Console.WriteLine("\tMAX: {0}", Math.Round(sendMax, 0));
Console.WriteLine("\tAVG: {0}", Math.Round(sendAvg, 0));
Console.WriteLine();
Console.WriteLine("Receive latency (mcs): ");
Console.WriteLine("\tMIN: {0}", Math.Round(recvMin, 0));
Console.WriteLine("\tMAX: {0}", Math.Round(recvMax, 0));
Console.WriteLine("\tAVG: {0}", Math.Round(recvAvg, 0));
Console.WriteLine("Time samples have been saved to " + filename );
Console.WriteLine(new String('-', 40));
}
///
/// Executes latency test for count messages
///
///
private void ThroughputTest(int count)
{
Console.WriteLine("Performance test for {0} messages", count);
//prepare all objects required for performance measurement and syncronization
sessionAcceptor.IncomingMessageEvent += new Session.IncomingMessageEventHandler(sessionAcceptorThroughput_IncomingMessageEvent);
receiveWatch.Reset();
allMessageReceivedEvent.Reset();
expectedMessages = count;
receivedMessages = 0;
Console.WriteLine("Sending messages... ");
receiveWatch.Start();
for (int i = 1; i <= count; ++i)
{
sessionInitiator.Put(msg);
//if (0 == (i % 25000)) Console.WriteLine("{0} messages were sent", i);
}
allMessageReceivedEvent.WaitOne();
receiveWatch.Stop();
sessionAcceptor.IncomingMessageEvent -= new Session.IncomingMessageEventHandler(sessionAcceptorThroughput_IncomingMessageEvent);
Console.WriteLine("done.");
Console.WriteLine("Ellapsed time: {0} ms", Math.Round((receiveWatch.Elapsed.TotalMilliseconds), 0));
Console.WriteLine("Throughput: {0} msg/sec", Math.Round(((count) / receiveWatch.Elapsed.TotalMilliseconds*1000 ), 1));
Console.WriteLine(new String('-', 40));
}
void sessionAcceptorLatency_IncomingMessageEvent(object sender, Session.IncomingMessageEventArgs args)
{
receiveWatch.Stop();
if (MsgNum < LatencyMessageCount)
ReceiveLatency[MsgNum] = receiveWatch.ElapsedTicks;
receiveWatch.Reset();
MsgNum++;
messageReceivedEvent.Set();
args.IncomingMessage.Dispose();
}
void sessionInitiatorLatency_OnBeforeMessageIsSent(object sender, Session.BeforeMessageIsSentEventArgs args)
{
sendWatch.Stop();
if( MsgNum < LatencyMessageCount )
SendLatency[MsgNum] = sendWatch.ElapsedTicks;
}
void sessionAcceptorLatency_OnAfterMessageIsReceived(object sender, Session.AfterMessageIsReceivedEventArgs args)
{
receiveWatch.Start();
}
void sessionAcceptorThroughput_IncomingMessageEvent(object sender, Session.IncomingMessageEventArgs args)
{
int received = Interlocked.Increment(ref this.receivedMessages);
if (received == expectedMessages)
{
allMessageReceivedEvent.Set();
}
args.IncomingMessage.Dispose();
//if (0 == (received % 25000)) Console.WriteLine("{0} messages were received", received);
}
#region Processing session New State events in order to syncronize both sessions with the main thread
void sessionInitiator_NewStateEvent(object sender, Session.NewStateEventArgs args)
{
SignalForExpectedState(args.NewState, 0);
}
void sessionAcceptor_NewStateEvent(object sender, Session.NewStateEventArgs args)
{
SignalForExpectedState(args.NewState, 1);
}
void SignalForExpectedState(SessionState state, int eventHandleIndex)
{
//Console.WriteLine("{0} Session state: {1}", eventHandleIndex, state);
if (state == this.expectedState)
{
autoEvents[eventHandleIndex].Set();
}
}
void ResetExpectedState(SessionState state)
{
expectedState = state;
foreach (AutoResetEvent evnt in this.autoEvents)
{
evnt.Reset();
}
}
#endregion
static void Main(string[] args)
{
try
{
Console.WriteLine("FIX Antenna .NET Benchmark");
if( args.Length != 1 || (!args[0].Equals("transient") && !args[0].Equals("persistent")) )
{
Console.WriteLine("Usage: SimpleBenchmark [transient|persistent]");
}
bool isPersistent = false;
if( args.Length > 0 && args[0].Equals("persistent") )
isPersistent = true;
FixEngine.Create("engine.properties");
Program program = new Program(isPersistent);
program.Connect(true); // set session parameters for latency test and do connect
program.LatencyTest(LatencyMessageCount);
program.closeSessions();
Program program2 = new Program(isPersistent);
program2.Connect(false); // set session parameters for throughput test and do connect
program2.ThroughputTest(ThroughputMessageCount);
program2.closeSessions();
FixEngine.Instance.Stop();
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
Console.WriteLine("Press ENTER to exit");
Console.ReadLine();
}
}
}