#region Copyright (c) B2BITS Corp. // // // $Revision: 1.3 $ // // (c) B2BITS EPAM Systems Company 2000-2010. // "Licensor" shall mean B2BITS EPAM Systems Company (B2BITS). // // This software is for the use of the paying client of B2BITS (which may be // a corporation, business area, business unit or single user) to whom it was // delivered (the "Licensee"). The use of this software is subject to // license terms. // // The Licensee acknowledges and agrees that the Software and Documentation // (the "Confidential Information") is confidential and proprietary to // the Licensor and the Licensee hereby agrees to use the Confidential // Information only as permitted by the full licence agreement between // the two parties, to maintain the confidentiality of the Confidential // Information and not to disclose the confidential information, or any part // thereof, to any other person, firm or corporation. The Licensee // acknowledges that disclosure of the Confidential Information may give rise // to an irreparable injury to the Licensor in-adequately compensable in // damages. Accordingly the Licensor may seek (without the posting of any // bond or other security) injunctive relief against the breach of the forgoing // undertaking of confidentiality and non-disclosure, in addition to any other // legal remedies which may be available, and the licensee consents to the // obtaining of such injunctive relief. All of the undertakings and // obligations relating to confidentiality and non-disclosure, whether // contained in this section or elsewhere in this agreement, shall survive // the termination or expiration of this agreement for a period of five (5) // years. // // The Licensor agrees that any information or data received from the Licensee // in connection with the performance of the support agreement relating to this // software shall be confidential, will be used only in connection with the // performance of the Licensor's obligations hereunder, and will not be // disclosed to third parties, including contractors, without the Licensor's // express permission in writing. // // Information regarding the software may be provided to the Licensee's outside // auditors and attorneys only to the extent required by their respective // functions. // // #endregion using System; using System.Diagnostics; using System.Text; using com.b2bits.FIXAntenna; using System.Threading; namespace SimpleBenchmark { class Program { Program(bool isPersist) { isPersistent = isPersist; } Stopwatch sendWatch = new Stopwatch(); //adds time spend for sending each message Stopwatch receiveWatch = new Stopwatch(); //calculates time spend for the whole test AutoResetEvent[] autoEvents; //used to syncronize session states SessionState expectedState = SessionState.NON_GRACEFULLY_TERMINATED; int expectedMessages = 0; int receivedMessages = 0; ManualResetEvent messageReceivedEvent = new ManualResetEvent(false); ManualResetEvent allMessageReceivedEvent = new ManualResetEvent(false); const string sender = "SND"; const string target = "TRG"; const int LatencyMessageCount = 100000; const int ThroughputMessageCount = 100000; int MsgNum = 0; long[] SendLatency = new long[LatencyMessageCount]; long[] ReceiveLatency = new long[LatencyMessageCount]; Session sessionInitiator = null; Session sessionAcceptor = null; FixMessage msg; bool isPersistent = false; private void Connect( bool isLatencyTest ) { FixEngine engine = FixEngine.Instance; Console.WriteLine("Creating 2 " + (isPersistent ? "persistent" : "transient") + " FIX sessions..."); SessionExtraParameters sessionParams = new SessionExtraParameters(); if (isLatencyTest) { sessionParams.DisableTCPBuffer = true; // disable buffering of data in TCP/IP stack sessionParams.AgressiveSend = true; // enable faster message delivery to wire sessionParams.EnableWireLevelEvents = true; // low-level socket events (BeforeSent, AfterReceived) to watch for, // affects throughput due to the high rate of events } autoEvents = new AutoResetEvent[] { new AutoResetEvent(false), new AutoResetEvent(false) }; //create 2 sessions and subscribe to events MessageStorageType strg = isPersistent ? MessageStorageType.Persistent : MessageStorageType.Transient; sessionInitiator = engine.CreateSession(sender, target, FixVersion.FIX42, sessionParams, strg); sessionInitiator.NewStateEvent += new Session.NewStateEventHandler(sessionInitiator_NewStateEvent); sessionAcceptor = engine.CreateSession(target, sender, FixVersion.FIX42, sessionParams, strg); sessionAcceptor.NewStateEvent += new Session.NewStateEventHandler(sessionAcceptor_NewStateEvent); //connect sessions - it is an asyncroneous operation... sessionAcceptor.ConnectAsAcceptor(); sessionInitiator.ConnectAsInitiator("localhost", engine.ListenPort, 3000); //so we need to wait while connection will be established ResetExpectedState(SessionState.ESTABLISHED); WaitHandle.WaitAll(autoEvents); Console.WriteLine("Preparing test message:"); msg = FixMessage.Parse("8=FIX.4.2"+"\x1"+"9=1"+"\x1"+"35=D"+"\x1"+"49=BLP"+"\x1"+"56=SCHB"+"\x1"+"34=01"+"\x1"+"50=30737"+"\x1"+"97=Y"+"\x1"+"52=20000809-20:20:50"+"\x1"+"11=90001008"+"\x1"+"1=10030003"+"\x1"+"21=2"+"\x1"+"55=TESTA"+"\x1"+"54=1"+"\x1"+"38=4000"+"\x1"+"40=2"+"\x1"+"59=0"+"\x1"+"44=30"+"\x1"+"47=I"+"\x1"+"60=20000809-18:20:32"+"\x1"+"10=000"+"\x1"); Console.WriteLine(msg.ToString('|')); Console.WriteLine(); } void closeSessions() { sessionAcceptor.DisconnectSync(true); sessionAcceptor.Dispose(); sessionInitiator.DisconnectSync(true); sessionInitiator.Dispose(); } /// /// Executes latency test for count messages /// /// private void LatencyTest(int count) { if (!Stopwatch.IsHighResolution) { Console.WriteLine("The .NET's Stopwatch object does not use high resolution timer in your system. The Latency test is not possible to do"); return; } Console.WriteLine("Latency test for {0} messages", count); sessionAcceptor.IncomingMessageEvent += new Session.IncomingMessageEventHandler(sessionAcceptorLatency_IncomingMessageEvent); sessionAcceptor.AfterMessageIsReceivedEvent += new Session.AfterMessageIsReceivedEventHandler(sessionAcceptorLatency_OnAfterMessageIsReceived); sessionInitiator.BeforeMessageIsSentEvent += new Session.BeforeMessageIsSentEventHandler(sessionInitiatorLatency_OnBeforeMessageIsSent); sendWatch.Reset(); receiveWatch.Reset(); Console.WriteLine("Sending messages... "); MsgNum = 0; // Send 1 message for warm-up sessionInitiator.Put(msg); messageReceivedEvent.WaitOne(); MsgNum = 0; for( int i = 0; i < count; ++i ) { messageReceivedEvent.Reset(); sendWatch.Reset(); sendWatch.Start(); sessionInitiator.Put(msg); messageReceivedEvent.WaitOne(); //if (0 == (i % 25000)) Console.WriteLine("{0} messages were sent", i); } //unsubscribe from this event, because for another test we will need another event sessionAcceptor.IncomingMessageEvent -= new Session.IncomingMessageEventHandler(sessionAcceptorLatency_IncomingMessageEvent); double sendMin = 999999, sendMax = 0, sendSum = 0, sendAvg; double recvMin = 999999, recvMax = 0, recvSum = 0, recvAvg; String filename = "latency.xls"; System.IO.TextWriter file = new System.IO.StreamWriter(filename); file.WriteLine( "Send\tRecv" ); for (int i = 0; i < count; i++) { double send = SendLatency[i]; send /= (Stopwatch.Frequency / 1000000.0); if (send < sendMin) sendMin = send; if (send > sendMax) sendMax = send; sendSum += send; double recv = ReceiveLatency[i]; recv /= (Stopwatch.Frequency / 1000000.0); if (recv < recvMin) recvMin = recv; if (recv > recvMax) recvMax = recv; recvSum += recv; file.WriteLine( Math.Round(send, 0) + "\t" + Math.Round(recv, 0) ); } file.Close(); sendAvg = sendSum / count; recvAvg = recvSum / count; Console.WriteLine("Send latency (mcs): "); Console.WriteLine("\tMIN: {0}", Math.Round(sendMin, 0)); Console.WriteLine("\tMAX: {0}", Math.Round(sendMax, 0)); Console.WriteLine("\tAVG: {0}", Math.Round(sendAvg, 0)); Console.WriteLine(); Console.WriteLine("Receive latency (mcs): "); Console.WriteLine("\tMIN: {0}", Math.Round(recvMin, 0)); Console.WriteLine("\tMAX: {0}", Math.Round(recvMax, 0)); Console.WriteLine("\tAVG: {0}", Math.Round(recvAvg, 0)); Console.WriteLine("Time samples have been saved to " + filename ); Console.WriteLine(new String('-', 40)); } /// /// Executes latency test for count messages /// /// private void ThroughputTest(int count) { Console.WriteLine("Performance test for {0} messages", count); //prepare all objects required for performance measurement and syncronization sessionAcceptor.IncomingMessageEvent += new Session.IncomingMessageEventHandler(sessionAcceptorThroughput_IncomingMessageEvent); receiveWatch.Reset(); allMessageReceivedEvent.Reset(); expectedMessages = count; receivedMessages = 0; Console.WriteLine("Sending messages... "); receiveWatch.Start(); for (int i = 1; i <= count; ++i) { sessionInitiator.Put(msg); //if (0 == (i % 25000)) Console.WriteLine("{0} messages were sent", i); } allMessageReceivedEvent.WaitOne(); receiveWatch.Stop(); sessionAcceptor.IncomingMessageEvent -= new Session.IncomingMessageEventHandler(sessionAcceptorThroughput_IncomingMessageEvent); Console.WriteLine("done."); Console.WriteLine("Ellapsed time: {0} ms", Math.Round((receiveWatch.Elapsed.TotalMilliseconds), 0)); Console.WriteLine("Throughput: {0} msg/sec", Math.Round(((count) / receiveWatch.Elapsed.TotalMilliseconds*1000 ), 1)); Console.WriteLine(new String('-', 40)); } void sessionAcceptorLatency_IncomingMessageEvent(object sender, Session.IncomingMessageEventArgs args) { receiveWatch.Stop(); if (MsgNum < LatencyMessageCount) ReceiveLatency[MsgNum] = receiveWatch.ElapsedTicks; receiveWatch.Reset(); MsgNum++; messageReceivedEvent.Set(); args.IncomingMessage.Dispose(); } void sessionInitiatorLatency_OnBeforeMessageIsSent(object sender, Session.BeforeMessageIsSentEventArgs args) { sendWatch.Stop(); if( MsgNum < LatencyMessageCount ) SendLatency[MsgNum] = sendWatch.ElapsedTicks; } void sessionAcceptorLatency_OnAfterMessageIsReceived(object sender, Session.AfterMessageIsReceivedEventArgs args) { receiveWatch.Start(); } void sessionAcceptorThroughput_IncomingMessageEvent(object sender, Session.IncomingMessageEventArgs args) { int received = Interlocked.Increment(ref this.receivedMessages); if (received == expectedMessages) { allMessageReceivedEvent.Set(); } args.IncomingMessage.Dispose(); //if (0 == (received % 25000)) Console.WriteLine("{0} messages were received", received); } #region Processing session New State events in order to syncronize both sessions with the main thread void sessionInitiator_NewStateEvent(object sender, Session.NewStateEventArgs args) { SignalForExpectedState(args.NewState, 0); } void sessionAcceptor_NewStateEvent(object sender, Session.NewStateEventArgs args) { SignalForExpectedState(args.NewState, 1); } void SignalForExpectedState(SessionState state, int eventHandleIndex) { //Console.WriteLine("{0} Session state: {1}", eventHandleIndex, state); if (state == this.expectedState) { autoEvents[eventHandleIndex].Set(); } } void ResetExpectedState(SessionState state) { expectedState = state; foreach (AutoResetEvent evnt in this.autoEvents) { evnt.Reset(); } } #endregion static void Main(string[] args) { try { Console.WriteLine("FIX Antenna .NET Benchmark"); if( args.Length != 1 || (!args[0].Equals("transient") && !args[0].Equals("persistent")) ) { Console.WriteLine("Usage: SimpleBenchmark [transient|persistent]"); } bool isPersistent = false; if( args.Length > 0 && args[0].Equals("persistent") ) isPersistent = true; FixEngine.Create("engine.properties"); Program program = new Program(isPersistent); program.Connect(true); // set session parameters for latency test and do connect program.LatencyTest(LatencyMessageCount); program.closeSessions(); Program program2 = new Program(isPersistent); program2.Connect(false); // set session parameters for throughput test and do connect program2.ThroughputTest(ThroughputMessageCount); program2.closeSessions(); FixEngine.Instance.Stop(); } catch (Exception ex) { Console.WriteLine(ex.ToString()); } Console.WriteLine("Press ENTER to exit"); Console.ReadLine(); } } }