/*

 * <COPYRIGHT>

 *

 *  $Revision: 1.3 $

 *

 *  (c) B2BITS EPAM Systems Company 2000-2011.

 *  "Licensor" shall mean B2BITS EPAM Systems Company (B2BITS).

 *

 *  This software is for the use of the paying client of B2BITS (which may be

 *  a corporation, business area, business unit or single user) to whom it was

 *  delivered (the "Licensee"). The use of this software is subject to

 *  licence terms.

 *

 *  The Licensee acknowledges and agrees that the Software and Documentation

 *  (the "Confidential Information") is confidential and proprietary to

 *  the Licensor and the Licensee hereby agrees to use the Confidential

 *  Information only as permitted by the full licence agreement between

 *  the two parties, to maintain the confidentiality of the Confidential

 *  Information and not to disclose the confidential information, or any part

 *  thereof, to any other person, firm or corporation. The Licensee

 *  acknowledges that disclosure of the Confidential Information may give rise

 *  to an irreparable injury to the Licensor in-adequately compensable in

 *  damages. Accordingly the Licensor may seek (without the posting of any

 *  bond or other security) injunctive relief against the breach of the forgoing

 *  undertaking of confidentiality and non-disclosure, in addition to any other

 *  legal remedies which may be available, and the licensee consents to the

 *  obtaining of such injunctive relief. All of the undertakings and

 *  obligations relating to confidentiality and non-disclosure, whether

 *  contained in this section or elsewhere in this agreement, shall survive

 *  the termination or expiration of this agreement for a period of five (5)

 *  years.

 *

 *  The Licensor agrees that any information or data received from the Licensee

 *  in connection with the performance of the support agreement relating to this

 *  software shall be confidential, will be used only in connection with the

 *  performance of the Licensor's obligations hereunder, and will not be

 *  disclosed to third parties, including contractors, without the Licensor's

 *  express permission in writing.

 *

 *  Information regarding the software may be provided to the Licensee's outside

 *  auditors and attorneys only to the extent required by their respective

 *  functions.

 *

 * <COPYRIGHT>

 */

 

#include <string>

#include <iostream>

#include <fstream>

 

#include <quickfix/Application.h>

#include <quickfix/MessageCracker.h>

#include <quickfix/Values.h>

#include <quickfix/Utility.h>

#include <quickfix/Mutex.h>

#include <quickfix/Event.h>

#include <quickfix/FileStore.h>

#include <quickfix/SocketAcceptor.h>

#include <quickfix/SocketInitiator.h>

#include <quickfix/ThreadedSocketAcceptor.h>

#include <quickfix/ThreadedSocketInitiator.h>

#include <quickfix/FileLog.h>

#include <quickfix/SessionSettings.h>

#include <quickfix/fix42/NewOrderSingle.h>

 

#include "PerfMeter.h"

 

class Application : public FIX::Application, public FIX::MessageCracker

{

public:

  Application( bool latencyTest ) :

     meter_("Message latency"),

     latencyTest_(latencyTest), receivedMsgs_(0)

  {}

 

  // Application overloads

  void onCreate( const FIX::SessionID& ) {}

  void onLogon( const FIX::SessionID& sessionID )

  {

      std::cout << std::endl << "Logon - " << sessionID << std::endl;

  }

 

  void onLogout( const FIX::SessionID& sessionID ) {}

  void toAdmin( FIX::Message&, const FIX::SessionID& ) {}

  void toApp( FIX::Message&, const FIX::SessionID& ) {}

  void fromAdmin( const FIX::Message&, const FIX::SessionID& ) {}

 

  void fromApp( const FIX::Message& message, const FIX::SessionID& sessionID )

  {

      crack( message, sessionID );

  }

 

 

   // MessageCracker overloads

   void onMessage( const FIX42::NewOrderSingle& message,

                                 const FIX::SessionID& sessionID )

   {

        if( latencyTest_ )

        {

            meter_.endTime();

            msgIsReceivedEvent_.signal();

        }

        else

        {

    #ifdef _WIN32

            ::InterlockedIncrement( &receivedMsgs_ );

    #else

             __sync_fetch_and_add (&receivedMsgs_, 1);

    #endif

 

             if( receivedMsgs_ == totalMsgsToReceive_ )

             {

                 meter_.endTime(true);

                 msgIsReceivedEvent_.signal();

             }

        }

  }

 

  FIX::Event msgIsReceivedEvent_;

  System::PerfMeter meter_;

  bool latencyTest_;

  int totalMsgsToReceive_;

  volatile long receivedMsgs_;

};

 

bool startsWith(const std::string& str, const std::string& substr)

{

    return str.find(substr) == 0;

}

 

void waitForSessionsLogon( std::set<FIX::SessionID>& sessions )

{

    FIX::process_sleep( 2 );

 

    bool bAllConnected = false;

    while( !bAllConnected )

    {

        std::set<FIX::SessionID>::iterator it = sessions.begin();

        bAllConnected = true;

        while( it != sessions.end() )

        {

            FIX::Session* session = FIX::Session::lookupSession(*it);

            if( !session->isLoggedOn() )

            {

                bAllConnected = false;

                break;

            }

            ++it;

        }

        FIX::process_sleep( 1 );

    }

}

 

void testLatency(const std::string& file, FIX::Message& msg)

{

    std::cout << "Message send/receive Latency test with local network interface ***\n" << std::endl;

 

    int msgsToSend = 100000;

   

    FIX::SessionSettings settings( file );

    Application application(true);

    FIX::FileStoreFactory storeFactory( settings );

    FIX::FileLogFactory logFactory( settings );

    FIX::ThreadedSocketAcceptor acceptor( application, storeFactory, settings, logFactory );

    FIX::ThreadedSocketInitiator initiator( application, storeFactory, settings, logFactory );

 

    acceptor.start();

    initiator.start();

 

    std::set<FIX::SessionID> sessions = initiator.getSessions();

    waitForSessionsLogon( sessions );

 

    FIX::Session* session = FIX::Session::lookupSession(*sessions.begin());

 

    for( int i = 0; i < msgsToSend; i++ )

    {

        application.meter_.startTime();

        session->send( msg );

        application.msgIsReceivedEvent_.wait();

    }

 

    std::cout << "Messages sent: " << msgsToSend << std::endl;

 

    application.meter_.printResults();

 

    initiator.stop();

    acceptor.stop();

}

 

void testThroughput(const std::string& file, FIX::Message& msg)

{

    std::cout << "Message send/receive Throughput test with local network interface ***\n" << std::endl;

 

    FIX::SessionSettings settings( file );

    Application application(false);

    FIX::FileStoreFactory storeFactory( settings );

    FIX::FileLogFactory logFactory( settings );

    FIX::SocketAcceptor acceptor( application, storeFactory, settings, logFactory );

    FIX::SocketInitiator initiator( application, storeFactory, settings, logFactory );

 

    std::set<FIX::SessionID> sessions = initiator.getSessions();

    size_t nSessionNum = sessions.size();

 

    int msgsToSend = (int) (1000000 / sessions.size());

 

    application.totalMsgsToReceive_ = (int) (msgsToSend * nSessionNum);

 

    acceptor.start();

    initiator.start();

 

    FIX::process_sleep( 2 );

 

    waitForSessionsLogon( sessions );

 

    application.meter_.startTime();

 

    for( int i = 0; i < msgsToSend; i++ )

    {

        std::set<FIX::SessionID>::iterator it = sessions.begin();

        while( it != sessions.end() )

        {

            FIX::Session::sendToTarget( msg, *it );

            ++it;

        }

    }

 

    application.msgIsReceivedEvent_.wait();

 

    int msgsPerSec = (int) application.meter_.round( (application.receivedMsgs_ / ((double) application.meter_.getLastResultMcs()/ 1000000)));

    std::cout << "Messages sent: " << application.receivedMsgs_ << std::endl;

    std::cout << "Concurrent FIX sessions: " << nSessionNum << std::endl;

    std::cout << "Throughput: " << msgsPerSec << " messages/sec." << std::endl;

 

    initiator.stop();

    acceptor.stop();

}

 

int main( int argc, char** argv )

{

    if ( argc != 3 )

  {

    std::cout << "usage: " << argv[ 0 ]

    << " FILE latency|throughput" << std::endl;

    return 0;

  }

  std::string file = argv[ 1 ];

  std::string test = argv[ 2 ];

 

  try

  {

     #ifdef B2BITS_QFADAPTOR

        std::cout << std::endl << "B2BITS FIX Antenna QF Adaptor: ";

     #else

        std::cout << std::endl << "QuickFIX Engine: ";

     #endif

 

     #ifdef B2BITS_QFADAPTOR

        QFA::InitializeEngineAdaptor("./config/converted/qfa.adaptor.properties", "./config/converted/engine.properties");

     #endif

 

      FIX42::NewOrderSingle msg( FIX::ClOrdID( "1" ), FIX::HandlInst(FIX::HandlInst_AUTOMATED_EXECUTION_ORDER_PRIVATE_NO_BROKER_INTERVENTION), FIX::Symbol( "aaa" ), FIX::Side(FIX::Side_BUY), FIX::TransactTime(), FIX::OrdType(FIX::OrdType_FOREX) );

 

      if( startsWith("latency", test) )

        testLatency(file, msg);

      else if( startsWith("throughput", test) )

        testThroughput(file, msg);

      else

        std::cout << "Unknown test type specified: '" << test << "'" << std::endl ,

        exit(0);

 

      #ifdef B2BITS_QFADAPTOR

         QFA::ShutdownEngineAdaptor();

      #endif

 

      return 0;

  }

  catch ( std::exception & e )

  {

    std::cout << e.what() << std::endl;

    return 1;

  }

}