com.epam.cmemdp.impl
Class MessageProcessor

java.lang.Object
  extended by com.epam.cmemdp.impl.MessageProcessor

public class MessageProcessor
extends java.lang.Object

Message processor is a class used to process increment message in right order.


Instrument can be in 3 different state:
This is how it can be used:
Case 1 (Normal processing - expected rptSecNum):
Instrument in normal state and messageProcessor process increment with expected rptSecNum. In this case we should register secNum in messageProcessorHelper and instrument, rptSecNum in instrument and deliver increment message to user application ASAP.
This case tested in second part MessageProcessorTest.testDublicateUpdate
Case 2 (Normal processing - small rptSecNum):
Smaller than expected rptSecNum can processed in 2 cases: if we process this increment from other stream and if secNum was reseted. Case 2-1 (Normal processing - small rptSecNum, already processed):
This case can be simply identify when we set secNum for this increment. In this case we should not process it anymore.
This case tested in first part MessageProcessorTest.testDublicateUpdate
Case 2-2 (Normal processing - small rptSecNum, reset):
Note: not always, when we set secNum into messageProcessorHelper, we can identify is this secNum processed or not. This happens because secNum can be reseted or message can be too old to register in helper. To prevent reset on this cases we check delta between one and another rptSecNums.
Case 3 (Normal processing - big rptSecNum):
This can be if we lost message in at last one stream or if we discard in one stream increment with secNum = n (because we process it with other stream) and start to process increment with secNum = n + 1.
Case 3-1 (Normal processing - big rptSecNum, asynch thread):
In this case we should place this increment to reorder. Instrument will be in `Small gap` state
This case tested in MessageProcessorTest.testNoHoleUpdate
Case 3-2 (Normal processing - big rptSecNum, message gap):
In this case we can hope, that other stream can recover information in this gap. To guarantee it we tries to find a `hole` in secNums in messageProcessorHelper.
We call gap in secNums `hole` if secNum of this gap is distant from last secNum for at last com.epam.cmemdp.impl.MessageProcessorHelper.minHoleOffset.
Case 3-2-1 (Normal processing - big rptSecNum, message gap after last hole):
We can guarantee this case if last secNum for normal processed increment greater than secNum of last hole. In this case we can hope and so we should process it just like Case 3-1.
This case tested in MessageProcessorTest.testNoHoleToThisInstrumentUpdate
Case 3-2-2 (Normal processing - big rptSecNum, message gap before last hole):
Or if we can`t guarantee previous case. In this case we should set `recovery` mode to instrument and connect to snapshot if necessary.
This case tested in MessageProcessorTest.testHoleUpdate
Case 4 (Small gap - expected rptSecNum):
In this case we may be process all gap increment and should deliver all saved increment message to user application in right order (or part of it, if we process only some gap) and update lastProcessedSeqNum property
Case 5 (Small gap - small rptSecNum):
Same as Case 2.
Case 6 (Small gap - big rptSecNum):
Same as Case 3, but we can check if we add to queue increment (n+1) after we playQueue with increment (n) and play it again (this can happen in Case 3-1) TODO
Case 7 (Recovery - expected rptSecNum):
In this case we should stop recovery if closed all gaps (or, at last, close gaps before last hole TODO)
Case 8 (Recovery - small rptSecNum):
Same as Case 2.
Case 9 (Recovery - big rptSecNum):
Same as Case 6.
Case 10 (Normal processing - snapshot):
TODO test using seqNum for snapshot, may be we should use other tag such as LastMsgSeqNumProcessed (see bovespa)
In this case we should probably skip snapshot to save trade. Or we should use special algorithm in Case 2-2 to pass old trade increment with not processed secNum
Case 11 (Small gap - snapshot):
Same as Case 10, but if we process snapshot we should pass trade increments (TODO)
Case 12 (Recovery - snapshot):
We must process snapshot and (TODO) trades.

Author:
Dmitry_Starshov

Field Summary
static boolean DEBUG_ENABLED
           
static boolean INFO_ENABLED
           
protected static Log log
           
static boolean TRACE_ENABLED
           
 
Constructor Summary
MessageProcessor()
           
 
Method Summary
 MessageProcessor getMessageProcessor(FIXFieldList message)
           
 void handleEvent(Instrument instrument, InstrumentEvent event)
           
 void handleForcedRecovery(Instrument instrument, ForcedRecoveryEvent event)
           
 void handleSnapshotReset(Instrument instrument, SnapshotResetEvent event)
           
 boolean processIncrement(Instrument instrument, GroupEntry entry)
           
 void processSnapshot(Instrument instrument, FIXFieldList msg)
           
 void setProcessorContext(ApplicationContext applicationContext)
           
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

log

protected static final Log log

INFO_ENABLED

public static final boolean INFO_ENABLED

DEBUG_ENABLED

public static final boolean DEBUG_ENABLED

TRACE_ENABLED

public static final boolean TRACE_ENABLED
Constructor Detail

MessageProcessor

public MessageProcessor()
Method Detail

handleEvent

public void handleEvent(Instrument instrument,
                        InstrumentEvent event)

handleForcedRecovery

public void handleForcedRecovery(Instrument instrument,
                                 ForcedRecoveryEvent event)

handleSnapshotReset

public void handleSnapshotReset(Instrument instrument,
                                SnapshotResetEvent event)

processSnapshot

public void processSnapshot(Instrument instrument,
                            FIXFieldList msg)

processIncrement

public boolean processIncrement(Instrument instrument,
                                GroupEntry entry)

getMessageProcessor

public MessageProcessor getMessageProcessor(FIXFieldList message)

setProcessorContext

public void setProcessorContext(ApplicationContext applicationContext)


Copyright © 2000-2014 EPAM Systems. All Rights Reserved.