00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 package com.epam.benchmark.latency;
00042
00043 import com.epam.benchmark.Util;
00044 import com.epam.fix.FIXVersion;
00045 import com.epam.fix.message.FIXFieldList;
00046 import com.epam.fix.message.RawFIXUtil;
00047 import com.epam.fixengine.FIXSession;
00048 import com.epam.fixengine.FIXSessionListener;
00049 import com.epam.fixengine.SessionParameters;
00050 import com.epam.fixengine.SessionState;
00051 import org.apache.commons.logging.Log;
00052 import org.apache.commons.logging.LogFactory;
00053
00054 import java.io.*;
00055 import java.util.concurrent.CountDownLatch;
00056 import java.util.concurrent.Semaphore;
00057 import java.util.concurrent.atomic.AtomicBoolean;
00058
00059
00060
00061
00062
00063
00064
00065 public class MeasureInternalLatencyBM implements FIXTransportListener, FIXSessionListener {
00066
00067
00068 public static final int MESSAGE_COUNT = 100000;
00069
00070 public static final int WARMUP_MESSAGE_COUNT = 100000;
00071
00072 private AtomicBoolean warmUpFlag = new AtomicBoolean(false);
00073
00074 private static final double NANOSECONDS_IN_MICROSECOND = 1000.0;
00075
00076 private static final Log LOG = LogFactory.getLog(MeasureInternalLatencyBM.class);
00077
00078 private long[] startTimes = new long[MESSAGE_COUNT];
00079
00080 private long[] endTimes = new long[MESSAGE_COUNT];
00081
00082 private String host;
00083
00084 private int port;
00085
00086 private CountDownLatch countDown;
00087
00088 private final Semaphore available = new Semaphore(1, true);
00089
00090 private FIXFieldList message;
00091
00092 private int count;
00093
00094 private FIXSession fixSession;
00095
00096 public static void main(String[] args) throws Exception {
00097 String host = args[0];
00098 int port = parsePortNumber(args[1]);
00099 new MeasureInternalLatencyBM(host, port).run();
00100 }
00101
00102
00103
00104
00105
00106
00107
00108 public MeasureInternalLatencyBM(String host, int port) {
00109 this.host = host;
00110 this.port = port;
00111 message = RawFIXUtil.getFIXFieldList(Util.STANDARD_MESSAGE.getBytes());
00112 }
00113
00114
00115
00116
00117
00118
00119 public void run() throws Exception {
00120 connect();
00121 warmUpEnvironment();
00122 measure();
00123 printResults();
00124 disconnect();
00125 System.exit(0);
00126 }
00127
00128 private void connect() throws IOException, InterruptedException {
00129
00130 TransportReplaceSessionFactory sessionFactory = new TransportReplaceSessionFactory(this);
00131 fixSession = sessionFactory.createInitiatorSession(createConnectionDetails());
00132 fixSession.setFIXSessionListener(this);
00133 fixSession.connect();
00134 Thread.sleep(1000);
00135 }
00136
00137 private void warmUpEnvironment() throws InterruptedException {
00138 warmUpFlag.set(true);
00139 for (int i = 0; i < WARMUP_MESSAGE_COUNT; i++) {
00140 fixSession.sendMessage(message);
00141 available.acquire();
00142 }
00143 warmUpFlag.set(false);
00144 Thread.sleep(1000);
00145 }
00146
00147
00148
00149
00150
00151
00152
00153 private void measure() throws Exception {
00154 countDown = new CountDownLatch(1);
00155 for (int i = 0; i < MESSAGE_COUNT; i++) {
00156 startTimes[i] = System.nanoTime();
00157 fixSession.sendMessage(message);
00158 available.acquire();
00159 }
00160 countDown.await();
00161 }
00162
00163
00164
00165
00166 public void onBeforeMessageSend(byte[] message) {
00167 long endTime = System.nanoTime();
00168 if (!warmUpFlag.get() && !RawFIXUtil.isSessionLevelMessage(message)) {
00169 endTimes[count++] = endTime;
00170 }
00171 }
00172
00173 public void onBeforeMessageSend(byte[] message, int offset, int length) {
00174 long endTime = System.nanoTime();
00175 if (!warmUpFlag.get() && !RawFIXUtil.isSessionLevelMessage(message, offset, length)) {
00176 endTimes[count++] = endTime;
00177 }
00178 }
00179
00180 public void onNewMessage(FIXFieldList message) {
00181 if (count >= MESSAGE_COUNT) {
00182 countDown.countDown();
00183 }
00184 available.release();
00185 }
00186
00187 public void onSessionStateChange(SessionState sessionState) {
00188 LOG.info("Session state: " + sessionState);
00189 }
00190
00191
00192
00193
00194
00195
00196 private void printResults() throws Exception {
00197 Writer fileWriter = null;
00198 try {
00199 fileWriter = new BufferedWriter(new FileWriter("latency_result.csv"));
00200 StringWriter writer = new StringWriter();
00201 printMeasurements(writer, fileWriter);
00202 LOG.info(writer.getBuffer());
00203 LOG.info("Measurement finished");
00204 } finally {
00205 if (fileWriter != null) {
00206 fileWriter.close();
00207 }
00208 }
00209 }
00210
00211 private void disconnect() {
00212 fixSession.getSessionParameters().setOutgoingSequenceNumber(1);
00213 fixSession.disconnect("finish");
00214 }
00215
00216 private SessionParameters createConnectionDetails() {
00217 SessionParameters details = new SessionParameters();
00218
00219 details.setFixVersion(FIXVersion.FIX42);
00220 details.setHost(host);
00221 details.setPort(port);
00222 details.setHeartbeatInterval(60000);
00223 details.setSenderCompId("BLP");
00224 details.setTargetCompId("SCHB");
00225 return details;
00226 }
00227
00228
00229
00230
00231
00232
00233
00234
00235 private void printMeasurements(Writer resultWriter, Writer fileWriter) throws Exception {
00236 long min = Long.MAX_VALUE;
00237 long max = 0;
00238 long sum = 0;
00239 for (int i = 0; i < endTimes.length; i++) {
00240 long latency = endTimes[i] - startTimes[i];
00241 fileWriter.write("" + latency + "\n");
00242 min = Math.min(min, latency);
00243 max = Math.max(max, latency);
00244 sum += latency;
00245 }
00246 resultWriter.write("\n\n");
00247
00248 resultWriter.write("MIN: " + (min / NANOSECONDS_IN_MICROSECOND) + " microseconds\n");
00249 resultWriter.write("MAX: " + (max / NANOSECONDS_IN_MICROSECOND) + " microseconds\n");
00250 resultWriter.write("AVG: " + (sum / (MESSAGE_COUNT * NANOSECONDS_IN_MICROSECOND)) + " microseconds\n");
00251 }
00252
00253 private static int parsePortNumber(String arg) {
00254 int port = -1;
00255 try {
00256 port = Integer.parseInt(arg);
00257 } catch (NumberFormatException e) {
00258 System.out.println("Incorrect port");
00259 }
00260 return port;
00261 }
00262 }