Introduction

FIX Edge Java is an application server providing FIX connectivity to multiple clients. Client applications communicate with FEJ through one of multiple transport protocols (e.g. Simple Sockets, TIBCO, CORBA, HTTPs) employing transport adaptors. It is designed to be as easy as possible to install, configure, administrate and monitor trading information flows. It is written in Java and has a performance profile suitable for the needs of all clients up to and including large sell-side institutions and large volume traders.

Getting started

System Requirements

To get started, make sure your system has Java JDK 8.

$ java -version
java version "1.8.0_60"
Java(TM) SE Runtime Environment (build 1.8.0_60-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)

Installing

To get started, you need to download the FEJ distribution. You should end up downloading a compressed ZIP file named something like fixedgej-1.0.0.zip. Once you have downloaded the FEJ zipball, installing and setting up a standalone FEJ node is pretty simple and straightforward. Let’s extract the compressed zip archive into /usr/share. On Linux, Mac OS X, or any other UNIX-like system you can use the following command to extract the distribution:

$ unzip fixedgej-1.0.0.zip -d /usr/share
$ cd /usr/share/fixedgej-1.0.0/
$ ls
CDDL-license.txt               apache-license-2.0.txt  bin   icons  license.txt  scripts    tmp          wrapperApp.jar
FixEdgeJavaVersionHistory.txt  bat                     conf  lib    log          templates  wrapper.jar

If you are using Windows, you will need to use a decompression tool such as WinZip to extract the distribution.

In the distribution directory you will find a bin directory that contains the scripts needed to start FEJ on UNIX platforms (Linux, Mac OS X, etc.) and a `bat' directory that contains the scripts for Windows. The conf directory holds configuration files. The lib directory contains Java JAR files, which are third-party files needed to run FEJ.

Once we have extracted the zipball, the next thing is to configure and start FEJ.

Starting the FEJ container on Windows (Unix)

Now, let’s go ahead and start FEJ. All FEJ administration scripts to start/stop the container for Windows are shipped along with the archive in the bat folder with the following content (Unix: similar in the bin folder):

$ pwd
/usr/share/fixedgej-1.0.0/bat
$ ls
installService.bat  runConsoleW.bat               setenv.bat        systemTrayIcon.bat    wrapper.bat
queryService.bat    runServicesManagerClient.bat  startService.bat  sytemTrayIconW.bat    wrapperW.bat
runConsole.bat      runServicesManagerServer.bat  stopService.bat   uninstallService.bat

To start FEJ as a console application on Windows system, you need to execute the runConsole.bat (unix: ./runConsole.sh):

$ ./runConsole.bat

To avoid going to the FEJ install directory to run these scripts, you can include it in your PATH variable as follows:

$ export PATH=$PATH:/usr/share/fixedgej-1.0.0/bat

To start FEJ as a Windows service you first need to install it. To install the application as a service execute installService.bat (unix: ./installDaemon.sh).

You need to have administrative privileges to run these script.

A successful installation of the container will show the following output:

$ ./installService.bat

C:\cygwin64\usr\share\fixedgej-1.0.0\bat>cd C:\cygwin64\usr\share\fixedgej-1.0.0\bat\

C:\cygwin64\usr\share\fixedgej-1.0.0\bat>call setenv.bat
"java" -Xmx30m -Djna_tmpdir="C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../tmp"
-Djava.net.preferIPv4Stack=true -jar "C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../wrapper.jar"
-i "C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../conf/wrapper.conf"
YAJSW: yajsw-alpha-12.00
OS   : Windows 7/6.1/amd64
JVM  : Oracle Corporation/1.8.0_60/D:\Program Files\Java\jre1.8.0_60/64
Oct 13, 2015 11:59:16 AM org.apache.commons.vfs2.VfsLog info
INFO: Using "C:\cygwin64\tmp\vfs_cache" as temporary files store.
************* INSTALLING FixEdgeJava ***********************

.
<some omitted output>
.

Service FixEdgeJava installed (1)
Press any key to continue . . .
1 Service installation completes successfully.

We are now ready to start the service. Run the startService.bat (unix: ./startDaemon.sh):

$ ./startService.bat

C:\cygwin64\usr\share\fixedgej-1.0.0\bat>cd C:\cygwin64\usr\share\fixedgej-1.0.0\bat\

C:\cygwin64\usr\share\fixedgej-1.0.0\bat>call setenv.bat
"java" -Xmx30m -Djna_tmpdir="C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../tmp"
-Djava.net.preferIPv4Stack=true -jar "C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../wrapper.jar"
-t "C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../conf/wrapper.conf"
YAJSW: yajsw-alpha-12.00
OS   : Windows 7/6.1/amd64
JVM  : Oracle Corporation/1.8.0_60/D:\Program Files\Java\jre1.8.0_60/64
Oct 13, 2015 12:10:33 PM org.apache.commons.vfs2.VfsLog info
INFO: Using "C:\cygwin64\tmp\vfs_cache" as temporary files store.
************* STARTING FixEdgeJava ***********************

Service FixEdgeJava started (1)
Press any key to continue . . .
1 Service starts successfully.

To stop the service, you can use stopService.bat script (unix: ./stopDaemon.sh):

$ ./stopService.bat

C:\cygwin64\usr\share\fixedgej-1.0.0\bat>cd C:\cygwin64\usr\share\fixedgej-1.0.0\bat\

C:\cygwin64\usr\share\fixedgej-1.0.0\bat>call setenv.bat
"java" -Xmx30m -Djna_tmpdir="C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../tmp"
-Djava.net.preferIPv4Stack=true -jar "C:\cygwin64\usr\share\fixedgej-
1.0.0\bat\/../wrapper.jar" -p "C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../conf/wrapper.conf"
YAJSW: yajsw-alpha-12.00
OS   : Windows 7/6.1/amd64
JVM  : Oracle Corporation/1.8.0_60/D:\Program Files\Java\jre1.8.0_60/64
Oct 13, 2015 12:13:36 PM org.apache.commons.vfs2.VfsLog info
INFO: Using "C:\cygwin64\tmp\vfs_cache" as temporary files store.
************* STOPPING FixEdgeJava ***********************

Service FixEdgeJava stopped (1)
Press any key to continue . . .
1 Service stopped successfully.

Finally, if you want to uninstall the service run uninstallService.bat (unix: ./uninstallDaemon.sh):

$ ./uninstallService.bat

C:\cygwin64\usr\share\fixedgej-1.0.0\bat>cd C:\cygwin64\usr\share\fixedgej-1.0.0\bat\

C:\cygwin64\usr\share\fixedgej-1.0.0\bat>call setenv.bat
"java" -Xmx30m -Djna_tmpdir="C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../tmp"
-Djava.net.preferIPv4Stack=true -jar "C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../wrapper.jar"
-r "C:\cygwin64\usr\share\fixedgej-1.0.0\bat\/../conf/wrapper.conf"
YAJSW: yajsw-alpha-12.00
OS   : Windows 7/6.1/amd64
JVM  : Oracle Corporation/1.8.0_60/D:\Program Files\Java\jre1.8.0_60/64
Oct 13, 2015 12:15:39 PM org.apache.commons.vfs2.VfsLog info
INFO: Using "C:\cygwin64\tmp\vfs_cache" as temporary files store.
************* REMOVING FixEdgeJava ***********************

Service FixEdgeJava removed (1)
Press any key to continue . . .
1 Service was removed successfully.

Configuration

This section will acquaint you with the basic information used during the configuration of the application.

All of the information in the configuration files is read at startup, meaning that any change to the files necessitates a restart of the application.

The conf folder holds the configuration files for FEJ.

C:\usr\share\fixedgej\conf>ls -la
total 39
drwxr-xr-x    1 root root     8192 Apr  4 15:30 .
drwxr-xr-x    1 root root     4096 Apr  4 15:25 ..
-rw-r--r--    1 root root     3280 Mar 24 14:20 cluster.xml (1)
-rw-r--r--    1 root root     1462 Mar 24 14:20 fej-context.xml (2)
-rw-r--r--    1 root root     1273 Mar 24 14:20 fej-jms.xml (3)
-rw-r--r--    1 root root     2562 Mar 24 14:20 fej-monitoring.xml (4)
-rw-r--r--    1 root root     2745 Mar 24 14:20 fej-replication.xml (5)
-rw-r--r--    1 root root     2837 Apr  3 20:36 fej-routing.xml (6)
-rw-r--r--    1 root root     1655 Mar 24 14:20 fej-scheduling.xml (7)
-rw-r--r--    1 root root     2502 Mar 24 14:20 fej-security.xml (8)
-rw-r--r--    1 root root     1764 Mar 24 14:20 fej-server.xml (9)
-rw-r--r--    1 root root     2122 Mar 24 14:20 fixedge.properties (10)
-rw-r--r--    1 root root      659 Mar 24 14:20 fixengine.properties (11)
-rw-r--r--    1 root root     3196 Mar 24 14:20 jms-adaptor.properties (12)
-rw-r--r--    1 root root     1298 Mar 24 14:20 log4j2.xml (13)
drwxr-xr-x    9 root root     4096 Apr  4 15:25 reject (14)
-rw-r--r--    1 root root     3782 Apr  4 14:07 replication.properties (15)
-rw-r--r--    1 root root      313 Mar 24 14:20 rules.groovy (16)
-rw-r--r--    1 root root      287 Mar 24 14:20 schedules.xml (17)
drwxr-xr-x    3 root root        0 Apr  4 15:25 session (18)
-rw-r--r--    1 root root      383 Mar 24 14:20 shell.properties (19)
-rw-r--r--    1 root root       30 Mar 24 14:20 users.properties (20)
-rw-r--r--    1 root root    10835 Mar 24 14:20 wrapper.conf (21)
1 Hazelcast cluster configuration. See Cluster Service.
2 Main spring application context.
3 JMS related beans. See JMS Transport Adapter.
4 JMX and SSH related beans.
5 Replication related beans.
6 Routing related beans.
7 Scheduler configuration.
8 Spring Security configuration. See Security configuration.
9 Fix server configuration.
10 FEJ configuration properties.
11 FIXAJ configuration properties. See FIX Antenna Java configuration.
12 JMS adaptor configuration properties. See JMS Transport Adapter.
13 Logging configuration. See Logging configuration.
14 Directory with templates for FIX reject messages
15 Replication service configuration. See Replication Service.
16 Custom routing rules go here. See Routing Rules.
17 Custom scheduler tasks go here.
18 Directory with session configuration. See FIX Session configuration.
19 Remote shell configuration. See Administrative shell configuration.
20 Properties file containing users for in-memory authentication. See In-Memory Authentication.
21 Wrapper configuration. See Wrapper configuration.

Security configuration

FEJ uses Spring Security for authentication purposes. Security configuration is stored in fej-security.xml file.

In-Memory Authentication

By default, FEJ container uses simple in-memory authentication.

<sec:authentication-manager id="authenticationManager">
    <sec:authentication-provider>
        <sec:user-service id="userDetailsService" properties="users.properties"/>
    </sec:authentication-provider>
</sec:authentication-manager>

Users are stored in the external properties file named users.properties. It contains single user test with password test and role ROLE_ADMIN:

$ pwd
/usr/share/fixedgej-1.0.0/conf
$ cat users.properties
test=test,ROLE_ADMIN,enabled

LDAP authentication

FEJ also supports authentication against an LDAP server.

Before getting deep into LDAP authentication, let’s get familiar with some LDAP terms.

Table 1. LDAP Authentication Basics
Term Description

Dn

Distinguished name, a unique name which is used to find user in LDAP server e.g. Microsoft Active Directory.

Ou

Organization Unit.

Bind

LDAP Bind is an operation in which LDAP clients sends bindRequest to LDAP user including username and password and if LDAP server is able to find user and password correct, it allows access to LDAP server.

Search

LDAP search is the operation which is performed to retrieve Dn of user by using some user credentials.

Root

LDAP directory’s top element, like root of a tree.

BaseDn

a branch in LDAP tree which can be used as base for LDAP search operation.

When the LDAP authentication option is activated, the default single user mode is turned off.

FIX Session configuration

FIX Antenna java is used for establishing and managing FIX sessions. But FEJ introduce per-file base configuration for each FIX session. All such configuration files are placed into subdirectories of session directory and should follow s_fix_[SESSION_ID].properties mask (this behaviour can be changed with 'sessionConfigManager' bean into fej-server.xml). Additional each subfolder may have SESSION_GROUP_s_fixDefault.properties configuration file which define common option for all group of FIX sessions.

Main options for FIX session(advanced options for FIX session you can find on FIX Antenna Java configuration page):

Property name Default value Description

sessionType

acceptor

Session type. If type is not defined then session will be resolved as acceptor. Valid values: acceptor/initiator.

host

The connecting host for initiator session

port

The connecting port for initiator session

senderCompID

Assigned value used to identify firm sending message

senderSubID

Assigned value used to identify specific message originator (desk, trader, etc.)

senderLocationID

Assigned value used to identify specific message originator’s location (i.e. geographic location and/or desk, trader)

targetCompID

Assigned value used to identify receiving firm

targetSubID

Assigned value used to identify specific individual or unit intended to receive message

targetLocationID

Assigned value used to identify specific message destination’s location (i.e. geographic location and/or desk, trader)

fixVersion

Version of the FIX protocol

appVersion

backupHost

Backup host for initiator session

backupPort

Backup port for initiator session

incomingSequenceNumber

0

Incoming sequence number

outgoingSequenceNumber

0

Outgoing sequence number

processedIncomingSequenceNumber

0

Last valid incoming sequence number

heartbeatInterval

30

Heartbeat interval (in seconds)

lastSeqNumResetTimestamp

fixFieldList

User defined fields for messages. If this list is not empty, Engine add it to each outgoing message.

outgoingLoginFixFieldList

Additional fields for outgoing Logon message

groups

Comma-separated list of routing groups

Logging configuration

FEJ uses Log4j 2 as a logging framework. Its configuration is stored in log4j2.xml file. For more information about configuration please refer Log4j 2 official documentation.

Administrative shell configuration

Shell configuration properties are stored in shell.properties file.

Table 2. Shell configuration properties
Name Default value Description

crash.auth

spring

Authentication mechanism

crash.ssh.port

2000

SSH server port

crash.ssh.auth_timeout

300000

Authentication timeout of the SSH server (in milliseconds)

crash.ssh.idle_timeout

300000

Idle timeout of the SSH server (in milliseconds)

crash.ssh.default_encoding

UTF-8

Character encoding

FEJ uses Java shell called ‘CRaSH’. For more information about it configuration properties please refer to the CRaSH reference documentation.

Wrapper configuration

FEJ uses YAJSW to run application as a Windows Service or UNIX Daemon. It configuration is stored in wrapper.onf file. Please refer to the YAJSW reference documentation for more information about available configuration properties.

FEJ Extension API

Transport adapter

Implementation

FEJ routing server provides the ability to integrate custom 3rdparty transports and use them for routing messages. For such goal server provides few interfaces for implementations:

Endpoint is a basic interface that represents instance of some entry, which could be unique identified by routing engine for receiving or delivering messages. EndpointParams provides unique id of such endpoint and other additional attributes, which may be useful for routing logic.

public interface Endpoint {
    EndpointParams getParams();
}

SourceEndpoint interface represent a provider of messages for routing engine.

public interface SourceEndpoint extends Endpoint {
    void setListener(SourceMessageListener listener);
}

It offers the ability to register listeners SourceMessageListener of incoming messages for external systems.

public interface SourceMessageListener {
    void onNewMessage(FIXFieldList message);
}
public interface SourceMessageListener {
    void onNewMessage(FIXFieldList message);
}

DestinationEndpoint represent a target of routing rules. It allows you to pass routed massage to the certain system.

public interface DestinationEndpoint extends Endpoint {
    /**
      * Send a {@link FIXFieldList} to this adapter. If the message is sent successfully,
      * the method returns {@code true}. If the message cannot be sent due to a
      * non-fatal reason, the method returns {@code false}. The method may also
      * throw a RuntimeException in case of non-recoverable errors.
      * <p>This method may block indefinitely, depending on the implementation.
      *
      * @param message the message to send
      * @return whether or not the message was sent
      */
     boolean send(FIXFieldList message);
}

There is also mechanism for registering such sources and destinations into server. After registration such endpoints will be accessible for routing engine. Registration of sources and destinations are independent. It means that you can register source and destination endpoint with the same id. This is especially important for bidirectional transports like FIX, where in and out connections are identified by the same parameters. For such transports exists 'BidirectionalEndpoint' interface.

public interface BidirectionalEndpoint extends ConsumerEndpoint, ProducerEndpoint {
}

Anyway, there are two separate interface for registering sources and destinations:

public interface SourceEndpointRegistry {
    void registerConsumer(SourceEndpoint consumer);
    void removeConsumer(String id);
}

public interface DestinationEndpointRegistry {
    void registerProducer(DestinationEndpoint producer);
    void removeProducer(String id);
}

Both they are implemented by EndpointRegistryAdapter class. it is available for accessing via Spring config:

<bean id="endpointRegistry" class="com.epam.fej.routing.endpoint.EndpointRegistryAdapter"
         c:destinationsRegister-ref="destinationsRegister"/>

JMS Transport Adapter

There are several ways to add JMS connectivity into FEJ container. fej-jms.xml configuration file already contains basic configuration for JMS adapter:

<bean id="jmsProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"
      p:location="classpath:jms-adaptor.properties"/>

<bean id="jmsConfig" class="com.epam.fixengine.jms.config.Config"
      c:prefix="jms.adaptor"
      c:properties-ref="jmsProperties">
</bean>

<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
      p:jmsManager-ref="jmsAdaptorManager"
      c:config-ref="jmsConfig" init-method="init"/>

<bean id="jmsClientFactory" class="com.epam.fixengine.jms.client.JMSClientFactory" factory-method="getInstance"/>

<bean id="jmsAdaptorManager" class="com.epam.fej.jms.JmsAdapterManager"
      c:endpointRegistry-ref="endpointRegistry"
      c:clientFactory-ref="jmsClientFactory"
      depends-on="rulesConfigManager"/>

jms-adaptor.properties file contains parameters for JMS producers and consumers (FIXAJ JMS Adaptor properties). jmsConfigRegister bean (com.epam.fej.jms.DefaultJmsConfigsRegister) is responsible for loading JMS session contexts (SessionContext) from configuration file and registering them with jmsAdaptorManager (com.epam.fej.jms.JmsAdaptorManager) for routing engine. JmsAdaptorManager builds source and destination endpoints adapters from given SessionContext objects and register them in server.

If you want use your own Configuration Factory you can use JmsManager implementation for building and registering SessionContext instances also.

DefaultJmsConfigsRegister produces SessionContext via JmsContextFactory implementation. By default it uses com.epam.fej.jms.JndiJmsSessionContextFactory implementation but you can set you own implementation via DefaultJmsConfigsRegister.setJmsContextFactory(JmsContextFactory jmsContextFactory). Also you can use com.epam.fej.jms.SimpleJmsContextFactory with your definition javax.jms.ConnectionFactory

public SessionContext createSessionContext(JmsConfig config) {
    final ConnectionInfo connectionInfo = config.getConnectionInfo();
    final SessionInfo sessionInfo = config.getSessionInfo();
    return new SessionContext(connectionInfo, sessionInfo, null, jmsContextFactory.createContext(config));
}

Please note that ConnectionInfo and SessionInfo classes support loading of custom properties from configuration files:

final Properties additionalProperties = connectionInfo.getAdditionalProperties();

final Map<String, Object> additionalParams = sessionInfo.getAdditionalParams();

Custom connection factory instead of JNDI

Custom jms connection factory could be used in few ways:

  1. Declaring ConnectionFactory in Spring and inject it in SimpleJmsContextFactory:

<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
      p:jmsManager-ref="jmsAdaptorManager"
      p:jmsContextFactory-ref="jmsContextFactory"
      c:config-ref="jmsConfig" init-method="init"/>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
      c:brokerURL-ref="${jms.broker.url}"/>

<bean id="jmsContextFactory" class="com.epam.fej.jms.SimpleJmsContextFactory"
      c:connectionFactory-ref="jmsConnectionFactory"/>
  1. Implement your own JmsContextFactory and pass it as a parameter for DefaultJmsConfigRegister:

public class ActiveMqJmsContextFactory implements JmsContextFactory {
    @Override
    public JMSContext createContext(JmsConfig config) {
        return new SimpleJmsContext(new ActiveMQConnectionFactory(
                config.getConnectionInfo().getProviderURL()), config.getConnectionInfo());
    }
}
<bean id="activeMqContextFactory" class="com.epam.fej.jms.ActiveMqJmsContextFactory"/>

<bean id="jmsConfigRegister" class="com.epam.fej.jms.DefaultJmsConfigsRegister"
      p:jmsManager-ref="jmsAdaptorManager"
      p:jmsContextFactory-ref="activeMqContextFactory"
      c:config-ref="jmsConfig" init-method="init"/>

Persistence API

Persistent API provides easy and fast way to storing data. It support two main storages: PersistentSequence and PersistentQueue.

PersistentSequence

PersistentSequence provides functionality for storing sequential data (for example, logs). Each record should have unique index. Indexer implementation is used to provide index for certain storing record. Also PersistentSequence requires custom Serializer implementation to serialize objects to byte buffer and restore it back.

PersistenceFactory factory = ...

Indexer<MyRecord> indexer = new Indexer<MyRecord>() {
    @Override
    public long getIndex(MyRecord item) {
        return item.getId();
    }
};

Serializer<MyRecord> serializer = new Serializer<MyRecord>() {
    @Override
    public void serialize(MyRecord item, ElasticByteBuffer buffer) {
        // serialize instance ...
    }

    @Override
    public MyRecord deserialize(ElasticByteBuffer buffer, int length) {
        MyRecord record = new MyRecord;
        // load data to instance...
        return record;
    }
};

final PersistentSequence<MyRecord> sequence = factory.buildSequence("seq_sample", indexer, serializer);

//store record to sequence
sequence.append(record);

In additional to storing data PersistentSequence provides methods for retrieving records from storage. It supports reading single item by index or iterating through items:

//get single item with index 100
final MyRecord myRecord = sequence.get(100);

//iterate items from index 0 till 100
sequence.retrieveItems(0, 100, new RetrieveSequenceItemsListener<MyRecord>() {
    @Override
    public void onItem(long id, MyRecord record) {
        //...
    }
}, true);

There is a possibility to review stored records and remove some of them from storage:

sequence.cleanUp(new CleanupItemListener<MyRecord>() {
    @Override
    public boolean checkForClean(long id, MyRecord item) {
        //return true to removed this record from storage
        return false;
    }
});

Or remove all and reset sequence:

//remove all items and reset index
sequence.reset();

Default PersistentSequence implementation is optimized for writing data and reading operations may take a bit more time.

PersistentQueue

PersistentQueue works like a queue but persist all items to disk. Thus it can restore its state after application restart. PersistentQueue has similar to java.util.Queue methods:

final PersistentQueue<MyRecord> queue = factory.buildQueue("queue_sample", serializer);

//push item to the tail of queue
queue.add(record);

// read iem from head but doesn't remove
record = queue.peek();

// extract item from head and remove it from queue
record = queue.poll();

Also PersistentQueue allow to iterate all its items:

queue.iterate(new RetrieveQueueItemsListener<MyRecord>() {
    @Override
    public void onItem(MyRecord record) {
        //....
    }
});

To remove all items from queue you can use clean method:

//remove all items and clean file
queue.clear();

Routing Rules

FIX Edge Java provides an RoutingRule unit as an abstraction for internal message routing element. FEJ supports pure Java and Groovy implementations for routing rules.

RoutingRule requires few components for its instantiation:

public RoutingRule(java.lang.String description,
                   SourceCondition sourceFilter,
                   RuleCondition contectFilter,
                   RuleAction action)
  • description - String with free test description of rule

  • source filter - check if this rule should be applied to messages from certain source. This filter was added as a separate with propose of optimization process. Such filter can by applied on static basis without addition affect in runtime. Source filter is SourceCondition implementation and can be null if you’d like to ignore it.

  • context filter - dynamic filter, which can check in the same time appliance of this rule depends on message content and source attributes. Context filter is RuleCondition implementation and can be null if you’d like to ignore it.

  • action - implementation of RuleAction which describes the main goal of this rule. It can be transformation, modification or just resending to required destination.

Sample of routing rule:

import com.epam.fej.routing.RoutingContext
import com.epam.fej.routing.rules.RoutingRule
import com.epam.fej.routing.rules.RuleAction
import com.epam.fej.routing.rules.RuleCondition

import static com.epam.fej.routing.CustomRoutingRules.getDefaultRules
import static com.epam.fej.routing.CustomRoutingRules.getRejectionRule

RoutingContext rc = routingContext as RoutingContext;
[
        new RoutingRule(
                // rule description
                "some Rule",
                //source filter - ignore for this rule
                null,
                // context filter - apply this rule for New Order - Single (D) messages
                { ctx -> ctx.getMessage().getTagValueAsString(35) == "D" } as RuleCondition,
                // action for rule - resend message to all session within same group
                // and stop message processing
                { ctx ->
                    rc.getDestinationsByGroup(ctx.sourceParams.groups).each { adapter ->
                        adapter.send(ctx.message)
                        ctx.exit()
                    }
                } as RuleAction),

        // append system rejection rules for not processed messages
        getRejectionRule(rc)
]

Cluster Service

Cluster service allows to establish communication of several applications, which are placed on different boxes. Each such application, which uses cluster service, are cluster node. Node can be run in leader or backup mode. Leader mode means that node is active, accepts network connection and performs some useful work. Backup nodes are used for preparing fast replacing if leader node becomes unavailable. In such case backup node could be switched to leader mode and it could provide same service.

Cluster service is distributed service and it allows to automatically find its nodes within network and control presence no more than one leader node.

ClusterManager interface manages interaction between application and cluster. In addition, it allows subscription for current node state and for cluster state.

With ClusterManager it’s possible to subscribe for 2 group of events: * LocalNodeLeaderListener – notification about leader events * LocalNodeBackupListener – notification about backup event

An application can use both listeners in the same time or the only one that most fit the current goal. For example, if an active node should have an active server, you have to implement only LocalNodeLeaderListener and put start and stop server’s calls into its methods.

To make it easier, we also provide LocalNodeLeaderListenerAdaptor and LocalNodeBackupListenerAdaptor. With their help you should only override necessary methods.

Default implementation of cluster manager is based on Hazelcast. We use Hazelcast as a distributed cache with a cluster state. Hezelcast also helps to resolve nodes within a network.

Hazelcast supports several different transports including multicast and TCP. The default configuration uses multicast so you must have multicast enabled on your network for this to work or update cluster.xml configuration file.

Cluster implementation also can configure the quorum for automatic leader election.

Current implementation allows you to automatically select the leader at the cluster init only (when there was no leader at all). If, for some reason, the leader disappears from the cluster - new one will need to be select in the manual mode.

Quick setup

Cluster manager has already built-in default settings. This allows you to start the Cluster service with the minimum effort.

public class ClusterSample {
    public static void main(String[] args) {
        ClusterManager clusterManager = new HazelcastClusterManager();      (1)
        clusterManager.addLocalNodeLeaderListener(new LocalNodeLeaderListenerAdaptor() {

            @Override
            public void onGranted() {      (2)
                System.out.println("This node was elected as a leader");
            }

            @Override
            public void onRevoked() {      (3)
                System.out.println("This node isn't leader any more");
            }
        });

        clusterManager.join();      (4)
        clusterManager.electLeader(clusterManager.localNode().id());      (5)

        //pause 1 sec
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }

        //print nodes
        clusterManager.nodes().stream().forEach(System.out::println);     (6)

        clusterManager.leave();      (7)
    }
}
1 Create instance of ClusterManager with default configuration.
2 This method will be called when node become a leader.
3 This method will be called when leader services should be stopped.
4 Join note to the cluster.
5 Assign this node as a cluster leader
6 Pring all node in this cluster
7 leave the cluster

In this case, the cluster will use UDP multicast address 224.2.2.3:54327 for communication. The size of the quorum for the default cluster is equal to 2 (the leader will be selected automatically if there are 2 or more nodes in the cluster).

Configuration

Hazelcast configuration

The current implementation uses Hazelcast for communication. You can find detailed Hazelcast configuration description on its site: Hazelcast Configuration.

If you want to override default configuration you can: * provide a file called cluster.xml with Hazelcast configuration on your classpath; * build com.hazelcast.config.Config manually and pass it into HazelcastClusterManager:

Config hazelcastConfig = new Config();
// Now set some stuff on the config (omitted)
ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
You can specify a name for the current node with instanceName option. Otherwise, a unique name will be assigned automatically.
If the quorum size is not specified in the configuration, then the first launched node will be selected as a leader node.

Cluster service options

In addition to Hazelcast settings in HazelcastClusterManager can be set:

Cluster troubleshooting

If the default multicast configuration is not working here are some common causes:

Multicast not enabled on the machine

It is quite common in particular on OSX machines for multicast to be disabled by default. Please google for the information on how to enable this.

Using wrong network interface

If you have more than one network interface on your machine (and this can also be the case if you are running VPN software on your machine), then Hazelcast may be using the wrong one.

To tell Hazelcast to use a specific interface you can provide the IP address of the interface in the interfaces element of the configuration. Make sure you set the enabled attribute to true. For example:

<interfaces enabled="true">
  <interface>192.168.1.20</interface>
</interfaces>

When multicast is not available

In some cases you may not be able to use multicast as it might not be available in your environment. In that case you should configure another transport, e.g. TCP to use TCP sockets, or AWS when running on Amazon EC2.

For more information on available Hazelcast transports and how to configure them please consult the Hazelcast documentation.

Cluster service lifecycle

Service uses exchanging of events between nodes to manage the cluster and notify nodes about state changes.

Joining a new node to the cluster

After calling the join() method, node subscribes to cluster events. The rest of the cluster nodes also will receive notification about new node in the cluster. On the node, which is a leader at this moment, LocalNodeLeaderListener.backupAdded() method will be called.

If a leader is present in the cluster, then the current node will be started in backup mode. If the leader is absent, then it may be initiated the leader election procedure.

Automatic election of new leader

The procedure of automatic leader selection is started at the cluster init.

Each node at start receives information about available nodes in cluster and decides whether or not currently be elected a leader. This decision is based on the presence or absence of a quorum (see Hazelcast configuration). It is considered that the cluster has quorum (and the leader can be chosen) if

 S/2+1 >= Q

where S – the number of nodes in the cluster at this moment and Q – the configured quorum size.

If the quorum size is not specified, it is considered that even one node is already compose a quorum.

Current implementation automatically chooses the leader in cluster, only in the case when the cluster did not have leader before.
The first time leader selection algorithm is:
  1. If the new node is decided that it is necessary to choose a leader, it sends LEADER_EVENT event to the cluster, and offers a new leader.

  2. Once the proposed new leader gets LEADER_EVENT event with its ID, it notifies the application about its new status by calling LocalNodeLeaderListener.onGranted() and sends LEADER_STARTED_EVENT event with their ID back to the cluster.

  3. When the rest of the cluster nodes receive the event LEADER_STARTED_EVENT, they set their state to BACKUP (mark itself as backup), and notify their applications about new status by calling LocalNodeBackupListener.onBackup().

You can check the presence of the leader in the cluster by using HazelcastClusterManager.hasLeader() method.

Appointment of the new leader

New leader can be assigned by calling HazelcastClusterManager.electLeader() method and passing the node ID. New leader can be assigned if cluster has a leader or if doesn’t have. In the first case the previous leader will be deactivated.

The algorithm of assigning new leader is:
  1. Node, which calls HazelcastClusterManager.electLeader() method, sends to the cluster LEADER_EVENT event with new leader ID.

  2. When backup node receives LEADER EVENT event, it calls LocalNodeBackupListener.offBackup() method.

  3. A node, which was elected as a new leader, launches a timer and waits for finishing the old leader. This mechanism was introduced to minimize the possibility of simultaneous work of the two leaders in the cluster. Old leader needs some time to complete its processes. The timer is used to protect cluster from endless waiting in case if the old leader becomes inaccessible or crashes during switching the leader.

  4. Old leader receives the LEADER_EVENT event and notifies its application about the status change by calling LocalNodeLeaderListener.onRevoked(). After the successful completion of this call, it sends a LEADER_STOPPED_EVENT event to the cluster.

  5. When the new leader node receives an LEADER_STOPPED_EVENT event (or it doesn’t receive this event during timeoutLeaderShutdown period), it notifies the application about new status by calling LocalNodeLeaderListener.onGranted() and sends to the cluster LEADER_STARTED_EVENT event with its ID.

  6. When the rest of the cluster nodes receive LEADER_STARTED_EVENT event they change their state to BACKUP (mark theirself as backup) and notifies their applications about new status by calling LocalNodeBackupListener.onBackup().

Recall the leader

There is the way to recall the leader. After recalling all nodes will be in backup state.

The algorithm of recalling the leader is:
  1. Node, which calls HazelcastClusterManager.recallLeader() method, sends to the cluster LEADER_RECALL_EVENT event with the leader ID.

  2. Current leader receives the LEADER_RECALL_EVENT event and notifies its application about the status change by calling LocalNodeLeaderListener.onRevoked(). After the successful completion of this call, it sends a LEADER_STOPPED_EVENT event to the cluster.

Replication Service

Replication service allow to maintain backup copies of Persistence API storages.
Replication service use Aeron transport for transmitting data to backup instances. Replication can work in synchronous and asynchronous mode. In synchronous mode storage sends notification about every operation to backups and waits for acknowledgement back. It blocks calling thread till receive acknowledgement or till predefined timeout will expire.

Replication service consist from 2 parts: leader and backup. Depends on instance role it need to initialize and start one or another instance.

Leader instance is responsible for
  • notifications about exist and new storages

  • delivering notifications about storage’s operations

  • handling synchronization requests

Backup instance is responsible for
  • creating required storages on backup

  • synchronize storages

  • update storages state (process notifications about storage’s operations)

Quick setup

It is very useful to use replication with ClusterManager. ClusterManager allows to start and stop leader or backup replication instance depends on node role in cluster. Also ClusterManager can notify replication service about available nodes and they addresses automatically.
To use replication service with with Cluster Manager it needs to implement LocalNodeLeaderListener and LocalNodeBackupListener to receive and handle notifications about cluster state. You can use default implementations of these listeners for replication service. They will handle all events and enable and configure appropriate service depends on node role in cluster.

Initialize replication leader

HazelcastCluster clusterManager = new HazelcastClusterManager();

// create controller for enable/disable leader replication role
ReplicationLeader replicationLeaderService = new ReplicationLeader(aeronTransport,correlationIdHolder,
        REPLICATION_LEADER_PORT,    (1)
        REPLICATION_BROADCAST_PORT, (2)
        "./logs/" + name);

// register listener for enabling/disabling replication functionality for leader
LocalNodeLeaderListener leaderListener = new ReplicationLeaderListener(replicationLeaderService);
        clusterManager.addLocalNodeLeaderListener(leaderListener);
1 REPLICATION_LEADER_PORT - port, which leader is listening for synchronization request from backups.
2 REPLICATION_BROADCAST_PORT - port, which each backup node is listening for replication data from leader.

Initialize replication backup

// create controller for enable/disable backup replication role
ReplicationBackup replicationBackupService = new ReplicationBackup(aeronTransport, correlationIdHolder,
        REPLICATION_LEADER_PORT, REPLICATION_BROADCAST_PORT, "./logs/" + name);

// register listener for enabling/disabling replication functionality for backup
LocalNodeBackupListener backupListener = new ReplicationBackupListener(replicationBackupService);
        clusterManager.addLocalNodeBackupListener(backupListener);

Creating storage directly

// Request instance of persistence factory
ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get();

// Request instance of named persistence sequence with synchronouse replication (replication timeout - 10 millis)
PersistentSequence<String> sequence = factory.getOrCreateSequence("seqName", indexer, new
                    StringSerializer(),
                    10, TimeUnit.MILLISECONDS);
// Request instance of named persistence queue with asynchronouse replication
PersistentQueue<String> queue = factory.getOrCreateQueue("queueName", new StringSerializer(),
                    0, TimeUnit.MILLISECONDS);

Creating storage with Persistance API

//Request instance of persistence factory
ReplicatedPersistenceFactory factory = ReplicatedPersistenceFactoryHolder.getFactory().get();
//Enable synchronious replication by-default (replication timeout - 10 millis)
factory.setSyncMode(10, TimeUnit.MILLISECONDS);

// Request instance of named persistence sequence
PersistentSequence<String> sequence = factory.buildSequence("seqName", indexer, new StringSerializer());
// Request instance of named persistence queue
PersistentQueue<String> queue = factory.buildQueue("queueName", new StringSerializer());

Configuration

Configuration of replication service there is in 2 configuration files: replication service configuration (replication.properties) and Aeron Media Driver configuration (aeron.properties).

Replication Service configuration

Replication Service configuration there is in replication.properties.

  • fej.replication.leader.sync
    Default (initial) replication mode (synchronous or asynchronous)
    Default: false

  • fej.replication.leader.async.timeout
    Default (initial) timeout for synchronous replication in milliseconds. Process could be blocked for this timeout till receive acknowledgement from other side.
    Default: 0 milliseconds (async mode)

  • fej.replication.leader.receive.buffer.size
    The size of the leader incoming ring buffer, must be power of 2.+ Default: 512 bytes

  • fej.replication.leader.receive.wait.strategy
    The wait strategy to use for the leader incoming ring buffer (see Disruptor WaitStrategy).
    Default: com.lmax.disruptor.BlockingWaitStrategy

  • fej.replication.leader.send.buffer.size
    The size of the leader outgoing ring buffer, must be power of 2.
    Default: 2048 bytes

  • fej.replication.leader.send.wait.strategy
    The wait strategy to use for the leader outgoing ring buffer (see Disruptor WaitStrategy).
    Default: com.lmax.disruptor.BlockingWaitStrategy

  • fej.replication.backup.receive.buffer.size
    The size of the backup incoming ring buffer, must be power of 2.
    Default: 1024 bytes

  • fej.replication.backup.receive.wait.strategy
    The wait strategy to use for the backup incoming ring buffer (see Disruptor WaitStrategy).
    Default: com.lmax.disruptor.BlockingWaitStrategy

  • fej.replication.backup.send.buffer.size
    The size of the backup outgoing ring buffer, must be power of 2.
    Default: 512

  • fej.replication.backup.send.wait.strategy
    The wait strategy to use for the backup outgoing ring buffer (see Disruptor WaitStrategy).
    Default: com.lmax.disruptor.BlockingWaitStrategy

  • fej.replication.aeron.mediadriver.embedded
    Use embedded aeron media driver (see Aeron Embedded Media Driver).
    Default: true

  • fej.replication.aeron.idle.strategy
    Provides an IdleStrategy for the thread responsible for communicating with the Aeron Media Driver (see Aeron Idle Strategies).
    Default: uk.co.real_logic.agrona.concurrent.BackoffIdleStrategy

Aeron Media Driver configuration

Please find description of Aeron configuration options at official page.

Replication service lifecycle

The goal of replication service is maintaining of full copies of the storages within a network. To do this it should support at least 2 operations:

  • synchronization of data to restore actual state

  • replication data in runtime to maintain the state

The service can replicate data in two modes - synchronous and asynchronous.

Sending messages asynchronously does not expect confirmation about the successful receipt and processing by other side. Replication is performed in parallel with the thread, which changes the data in storages.

Asynchronous replication expects to receive confirmation about the successful processing, at least, from one of the backup nodes. A processing timeout can be specified for the storage instance during its creating (otherwise the default settings will be used). The thread that changes the data in storages, is blocked until receiving acknowledgement or until expiring of the timeout. In the last case, the warning, that signals that the data was not successfully transmitted to the any of the backup node during specified period, will be logged.

Replication mode is set separately for each of the repositories, so it is possible to simultaneously use both the storage with synchronous replication and with asynchronous.

Each backup node maintains two transport channels because Aeron transport is unidirectional by its nature:

  • incomming: for sending synchronization requests and synchronouse acknowledgments

  • outgoing: for receiving information about changing the data in storages

The listening port is assigned to each node of replication service, depends on it role. If you change the node’s role, the listening port is not changed. One and the same port will be always listened by leader and backups always know how to contact it. Same for the port that is listening by backups.

Persistent storages are degigned for incremental updates. Internal storage contains log of operations like 'APPEND' and 'REMOVE'. In case of synchronization or replication data it needs only to send new updates.

Data synchronization procedure

After starting the backup instance synchronizes its state with the leader.

The synchronization procedure is:
  1. The backup sends GET_ALL_RESOURCES request to the leader right after start.

  2. The leader sends lists of exist queries and sequences in response as 'RESOURCE_LIST' messages. CorrelationId is also sent for every storage. CorrelationId is unique id for storage. It uses in next communications.

  3. The backup loads exist and creates non-exist storages.

  4. The backup sends SYNC_REQ request for every storage and pass its index. The timestamp of last reset is passed for sequence in addition.

  5. The leader sends SYNC_REQ_ACCEPTED message in answer. It compares indexes and timestamps and resent all requied data. For sequence it can also send SEQUENCE_RESET message, if reset was missed. At the end the leader sends SYNC_FINISHED message to indicate the border of synchronization answer.

Data replication procedure

On every operation with persistent storage the leader sends updates to all backups.

The replication procedure is:
  1. On adding new item to internal storage the leader sends QUEUE_ADD or SEQUENCE_APPEND message. New data and internal ordered index is sent with this message.

  2. The backup received this message and compares expecter index with received.

  3. If received index is differen then expected, it start synchronization procedure (see Data synchronization procedure, n.4)

  4. If the leader indicate that this is synchronouse storage and it expects acknowlagement, the backup sends back ACK message.

FIX session replication

FIX Antenna Java also can use replication storages thru Persistence API.

To enable replicated storage for FIX session it needs to setup replication leader and backup (Initialize replication leader, Initialize replication backup) and use next configuration options for FIX antenna (in fixengine.properties) :

storageFactory=com.epam.fixengine.storage.persistence.PersistenceEnableFactory (1)
storage.persistenceFactoryBuilder=com.epam.fixengine.storage.persistence.ReplicatedPersistenceFactoryBuilder (2)
replicationTimeout=10 (3)
1 Use PersistenceEnableFactory for storageFactory property. This factory allows to use Persistence API for storing FIX session state. PersistenceEnableFactory is based on FilesystemStorageFactory and delegate all operation Persistence API objects. To work with this API it requires implementation of PersistentFactoryBuilder. The last one should construct instance of PersistenceFactory.
2 Define ReplicatedPersistenceFactoryBuilder like a factory builder for PersistenceEnableFactory. ReplicatedPersistenceFactoryBuilder implemets PersistentFactoryBuilder and build replicated instance of factory. It uses replicationTimeout options from FIX antenna config (or from session’s Configuration instance) to configure synchronouse or aynchronouse replication more for FIX session.
3 Define replication timeout in milliseconds.
Zero value for this option will enable asynchronouse replication for FIX session.

Administration

Monitoring and management using a remote shell

Introduction

FIX Edge Java includes an interactive shell that you can use to start, stop and query fix server and sessions. There are also useful commands to monitor JVM. In this section we will introduce the main commands and features of the shell.

Connecting to the remote shell

Connection is done on port 2000.

Linux and OSX users can use ssh to connect to the remote shell, Windows users can download and install PuTTY.

$ ssh -p 2000 user@localhost
Password authentication
Password:
 ________  _____  ____  ____  ________  ______      ______  ________        _____     _  ____   ____  _
|_   __  ||_   _||_  _||_  _||_   __  ||_   _ `.  .' ___  ||_   __  |      |_   _|   / \|_  _| |_  _|/ \
  | |_ \_|  | |    \ \  / /    | |_ \_|  | | `. \/ .'   \_|  | |_ \_|        | |    / _ \ \ \   / / / _ \
  |  _|     | |     > `' <     |  _| _   | |  | || |   ____  |  _| _     _   | |   / ___ \ \ \ / / / ___ \
 _| |_     _| |_  _/ /'`\ \_  _| |__/ | _| |_.' /\ `.___]  |_| |__/ | _ | |__' | _/ /   \ \_\ ' /_/ /   \ \_
|_____|   |_____||____||____||________||______.'  `._____.'|________|(_)`.____.'|____| |____|\_/|____| |____|

Welcome to Dzmitry + !
It is Sun Oct 11 05:15:22 BRT 2015 now

The bye command disconnect from the shell.

% bye
Have a good day!

Connection to localhost closed.

Remote shell credentials

Shell uses a Spring Security to handle login duties. See Security configuration.

Remote shell commands

Administrative shell provide an environment with a set of specific commands after establishing connection.

Cluster management

Usage:

% cluster [-h | --help] COMMAND [ARGS]

The most commonly used cluster commands are:

  • stop - stop node with given id.

% cluster stop FixEdgeJ1
Node with id [-1062725718] has stopped
  • info - show node info.

% cluster info FixEdgeJ1
name      leader id          address             local
-------------------------------------------------------
FixEdgeJ1 true   -1062725718 /192.168.1.100:5701 true
  • nodes - list of cluster nodes.

% cluster nodes
Id             Name          Local          Leader          Address
--------------------------------------------------------------------------------
-1062725718    FixEdgeJ1     this           this            /192.168.1.100:5701
-1062725719    FixEdgeJ2                                    /192.168.1.101:5701
  • health - show status on the health of the cluster.

% cluster health
clusterName status numberOfNodes
---------------------------------
ClisterJ    green  2
  • elect - elect cluster leader (all other nodes will be switched in backup mode).

% cluster elect FixEdgeJ2
Node with id [-1062725719] has become the cluster leader

FIX Server management

Usage:

% server [-h | --help] COMMAND [ARGS]

The most commonly used server commands are: * start - start fix server

% server start
Fix server successfully started
  • stop - stop fix server

% server stop
Fix server successfully stopped
  • state - show fix server current state

% server state
STARTED
  • opts - show fix server options e.g. port, state etc.

% server opts
port state
----------
8911 STARTED

FIX Session management

Usage:

% session [-h | --help] COMMAND [ARGS]

The most commonly used session commands are:

  • tr - send test request to session

  • delete - delete session

  • start - start session

  • stop - stop session

  • reset - reset session sequence numbers

  • state - show current session state

  • hb - send heartbeat to session

  • info - session info

  • ls - list all registered sessions

% session ls
Id           State         Type         SenderCompId    TargetCompId     Group
----------------------------------------------------------------------------------
iLinkTest    INACTIVE      INITIATOR    FECompId        iLink            []
session1     INACTIVE      ACCEPTOR     FECompId        Test1            [A]
session2     INACTIVE      ACCEPTOR     FECompId        Test2            [A, B, C]
  • send - send fix message to session

  • seqnum - set session in/out sequence numbers

  • tobackup - switch session from primary to backup connection

  • toprimary - switch session from backup to primary connection

  • getstart - show session scheduled start time

  • getstop - show session scheduled stop time