com.epam.cmemdp.impl
Class MessageProcessor
java.lang.Object
com.epam.cmemdp.impl.MessageProcessor
public class MessageProcessor
- extends java.lang.Object
Message processor is a class used to process increment message in right
order.
Instrument can be in 3 different state:
- Normal processing
- Small gap
- Recovery
This is how it can be used:
Case 1 (Normal processing - expected rptSecNum):
Instrument in normal state and messageProcessor process increment with
expected rptSecNum. In this case we should register secNum in
messageProcessorHelper and instrument, rptSecNum in instrument and deliver
increment message to user application ASAP.
This case tested in second part
MessageProcessorTest.testDublicateUpdate
Case 2 (Normal processing - small rptSecNum):
Smaller than expected rptSecNum can processed in 2 cases: if we process this
increment from other stream and if secNum was reseted. Case 2-1
(Normal processing - small rptSecNum, already processed):
This case can be simply identify when we set secNum for this increment. In
this case we should not process it anymore.
This case tested in first part
MessageProcessorTest.testDublicateUpdate
Case 2-2 (Normal processing - small rptSecNum, reset):
Note: not always, when we set secNum into messageProcessorHelper, we can
identify is this secNum processed or not. This happens because secNum can be
reseted or message can be too old to register in helper. To prevent reset on
this cases we check delta between one and another rptSecNums.
Case 3 (Normal processing - big rptSecNum):
This can be if we lost message in at last one stream or if we discard in one
stream increment with secNum = n (because we process it with other stream)
and start to process increment with secNum = n + 1.
Case 3-1 (Normal processing - big rptSecNum, asynch thread):
In this case we should place this increment to reorder. Instrument will be in
`Small gap` state
This case tested in MessageProcessorTest.testNoHoleUpdate
Case 3-2 (Normal processing - big rptSecNum, message gap):
In this case we can hope, that other stream can recover information in this
gap. To guarantee it we tries to find a `hole` in secNums in
messageProcessorHelper.
We call gap in secNums `hole` if secNum of this gap is distant from last
secNum for at last
com.epam.cmemdp.impl.MessageProcessorHelper.minHoleOffset
.
Case 3-2-1 (Normal processing - big rptSecNum, message gap after
last hole):
We can guarantee this case if last secNum for normal processed increment
greater than secNum of last hole. In this case we can hope and so we should
process it just like Case 3-1.
This case tested in
MessageProcessorTest.testNoHoleToThisInstrumentUpdate
Case 3-2-2 (Normal processing - big rptSecNum, message gap before
last hole):
Or if we can`t guarantee previous case. In this case we should set `recovery`
mode to instrument and connect to snapshot if necessary.
This case tested in MessageProcessorTest.testHoleUpdate
Case 4 (Small gap - expected rptSecNum):
In this case we may be process all gap increment and should deliver all saved
increment message to user application in right order (or part of it, if we
process only some gap) and update lastProcessedSeqNum property
Case 5 (Small gap - small rptSecNum):
Same as Case 2.
Case 6 (Small gap - big rptSecNum):
Same as Case 3, but we can check if we add to queue increment (n+1)
after we playQueue with increment (n) and play it again (this can happen in
Case 3-1) TODO
Case 7 (Recovery - expected rptSecNum):
In this case we should stop recovery if closed all gaps (or, at last, close
gaps before last hole TODO)
Case 8 (Recovery - small rptSecNum):
Same as Case 2.
Case 9 (Recovery - big rptSecNum):
Same as Case 6.
Case 10 (Normal processing - snapshot):
TODO test using seqNum for snapshot, may be we should use other tag such as
LastMsgSeqNumProcessed (see bovespa)
In this case we should probably skip snapshot to save trade. Or we should use
special algorithm in Case 2-2 to pass old trade increment with not
processed secNum
Case 11 (Small gap - snapshot):
Same as Case 10, but if we process snapshot we should pass trade
increments (TODO)
Case 12 (Recovery - snapshot):
We must process snapshot and (TODO) trades.
- Author:
- Dmitry_Starshov
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
log
protected static final Log log
INFO_ENABLED
public static final boolean INFO_ENABLED
DEBUG_ENABLED
public static final boolean DEBUG_ENABLED
TRACE_ENABLED
public static final boolean TRACE_ENABLED
MessageProcessor
public MessageProcessor()
handleEvent
public void handleEvent(Instrument instrument,
InstrumentEvent event)
handleForcedRecovery
public void handleForcedRecovery(Instrument instrument,
ForcedRecoveryEvent event)
handleSnapshotReset
public void handleSnapshotReset(Instrument instrument,
SnapshotResetEvent event)
processSnapshot
public void processSnapshot(Instrument instrument,
FIXFieldList msg)
processIncrement
public boolean processIncrement(Instrument instrument,
GroupEntry entry)
getMessageProcessor
public MessageProcessor getMessageProcessor(FIXFieldList message)
setProcessorContext
public void setProcessorContext(ApplicationContext applicationContext)
Copyright © 2000-2014 EPAM Systems. All Rights Reserved.