Message processor is a class used to process increment message in right
order.
Instrument can be in 3 different state:
- Normal processing
- Small gap
- Recovery
This is how it can be used:
Case 1 (
Normal processing - expected rptSecNum):
Instrument in normal state and messageProcessor process increment with
expected rptSecNum. In this case we should register secNum in
messageProcessorHelper and instrument, rptSecNum in instrument and deliver
increment message to user application ASAP.
This case tested in second part
MessageProcessorTest.testDublicateUpdate
Case 2 (
Normal processing - small rptSecNum):
Smaller than expected rptSecNum can processed in 2 cases: if we process this
increment from other stream and if secNum was reseted.
Case 2-1
(
Normal processing - small rptSecNum, already processed):
This case can be simply identify when we set secNum for this increment. In
this case we should not process it anymore.
This case tested in first part
MessageProcessorTest.testDublicateUpdate
Case 2-2 (
Normal processing - small rptSecNum, reset):
Note: not always, when we set secNum into messageProcessorHelper, we can
identify is this secNum processed or not. This happens because secNum can be
reseted or message can be too old to register in helper. To prevent reset on
this cases we check delta between one and another rptSecNums.
Case 3 (
Normal processing - big rptSecNum):
This can be if we lost message in at last one stream or if we discard in one
stream increment with secNum = n (because we process it with other stream)
and start to process increment with secNum = n + 1.
Case 3-1 (
Normal processing - big rptSecNum, asynch thread):
In this case we should place this increment to reorder. Instrument will be in
`Small gap` state
This case tested in
MessageProcessorTest.testNoHoleUpdate
Case 3-2 (
Normal processing - big rptSecNum, message gap):
In this case we can hope, that other stream can recover information in this
gap. To guarantee it we tries to find a `hole` in secNums in
messageProcessorHelper.
We call gap in secNums `hole` if secNum of this gap is distant from last
secNum for at last
com.epam.cmefast.impl.MessageProcessorHelper.minHoleOffset
.
Case 3-2-1 (
Normal processing - big rptSecNum, message gap after
last hole):
We can guarantee this case if last secNum for normal processed increment
greater than secNum of last hole. In this case we can hope and so we should
process it just like
Case 3-1.
This case tested in
MessageProcessorTest.testNoHoleToThisInstrumentUpdate
Case 3-2-2 (
Normal processing - big rptSecNum, message gap before
last hole):
Or if we can`t guarantee previous case. In this case we should set `recovery`
mode to instrument and connect to snapshot if necessary.
This case tested in
MessageProcessorTest.testHoleUpdate
Case 4 (
Small gap - expected rptSecNum):
In this case we may be process all gap increment and should deliver all saved
increment message to user application in right order (or part of it, if we
process only some gap) and update lastProcessedSeqNum property
Case 5 (
Small gap - small rptSecNum):
Same as
Case 2.
Case 6 (
Small gap - big rptSecNum):
Same as
Case 3, but we can check if we add to queue increment (n+1)
after we playQueue with increment (n) and play it again (this can happen in
Case 3-1) TODO
Case 7 (
Recovery - expected rptSecNum):
In this case we should stop recovery if closed all gaps (or, at last, close
gaps before last hole TODO)
Case 8 (
Recovery - small rptSecNum):
Same as
Case 2.
Case 9 (
Recovery - big rptSecNum):
Same as
Case 6.
Case 10 (
Normal processing - snapshot):
TODO test using seqNum for snapshot, may be we should use other tag such as
LastMsgSeqNumProcessed (see bovespa)
In this case we should probably skip snapshot to save trade. Or we should use
special algorithm in
Case 2-2 to pass old trade increment with not
processed secNum
Case 11 (
Small gap - snapshot):
Same as
Case 10, but if we process snapshot we should pass trade
increments (TODO)
Case 12 (
Recovery - snapshot):
We must process snapshot and (TODO) trades.