Connecting to the Avid Platform using the Avid Connector API

Connecting to the Avid Platform using the Avid Connector API

The Avid Connector API connects to the Avid Platform over the Avid Secure Gateway. To connect your service to the Avid Platform, the Avid Secure Gateway must be running on a known host and port. The default port is 9900.

Default Connection Settings

If no special settings are passed in, and no special environment variables defined, the following defaults are used:

  • Query Timeout (ms): 10,000
  • Gateway Port: 9900
  • Gateway Host: 127.0.0.1
  • Bus Initial Connection Attempts: -1 (try forever)
  • Bus Initial Reconnection Attempts: -1 (try forever)
  • Bus Reconnection Delay (ms): 1,000
  • Bus exponential back-off reconnection duration for unlicensed connection (ms): 600000
  • Bus exponential back-off reconnection jitter for unlicensed connection: true

To connect to the Avid Platform, create a new instance of a BusAccess using IpcBusAccessFactory:

When connecting to Avid Platform, authentication provider may be specified with valid ClientId and ClientSecret. If service is started from trusted IP address, it may be started w/o authentication provider. In this case authentication provider must be null:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Create a new BusAccess, without Authentication Provider and not connected to the bus
BusAccess bus = new IpcBusAccessFactory().createBusAccess(null, false);
// then connect to the bus
bus.connect();

// OR

// Create a new BusAccess, without Authentication Provider and connected to the bus immediately
BusAccess bus = new IpcBusAccessFactory().createBusAccess(null, true);


// Create a new BusAccess, with Authentication Provider and not connected to the bus
BusAccess bus = new IpcBusAccessFactory().createBusAccess(new ClientAuthentication("620f8ca60b0e11e6b5123e1d05defe78", "2078884c3d6311e6ac619e71128cae77"), false);
// then connect to the bus
bus.connect();

// OR

// Create a new BusAccess, with Authentication Provider and connected to the bus immediately
BusAccess bus = new IpcBusAccessFactory().createBusAccess(new ClientAuthentication("620f8ca60b0e11e6b5123e1d05defe78", "2078884c3d6311e6ac619e71128cae77"), true);

Overriding Default Settings with Environment Variables, System Properties, or OSGi

You can override the default connection settings by setting pre-defined environment variables, Java system properties, or using an OSGi configuration file.

The table below shows some of the variables/keys you can set to affect the way the bus connects:

Environment Variable Java System Property Configuration file Description
ACS_BUS_QUERY_TIMEOUT com.avid.acs.bus.ConnectionOptions.queryTimeout
com.avid.acs.bus.BusAccessSettings.queryTimeout
queryTimeout The default timeout (in ms) for queries. Default is 10000 ms
ACS_GATEWAY_HOST com.avid.acs.bus.ConnectionOptions.gatewayHost
com.avid.acs.bus.BusAccessSettings.gatewayHost
gatewayHost Secure Gateway connection host. Default is 127.0.0.1
ACS_GATEWAY_PORT com.avid.acs.bus.ConnectionOptions.gatewayPort
com.avid.acs.bus.BusAccessSettings.gatewayPort
gatewayPort Secure Gateway connection port. Default is 9900
ACS_GATEWAY_UNSECURE_PORT com.avid.acs.bus.ConnectionOptions.gatewayUnsecurePort
com.avid.acs.bus.BusAccessSettings.gatewayUnsecurePort
gatewayUnsecurePort Port for not secured connection. Default is 9966
ACS_GATEWAY_PROTOCOL_SEQUENCE com.avid.acs.bus.ConnectionOptions.gatewayProtocolSequence
com.avid.acs.bus.BusAccessSettings.gatewayProtocolSequence
gatewayProtocolSequence Sequence of protocols (or just one protocol) separated by coma ‘,’ in which Avid Connector API will try to establish connection to Secure Gateway. Default is ‘wss’. Allowed protocols are ‘wss’ and ‘ws’. Possible combinations are ‘wss,ws’, ‘ws’, ‘wss’ or ‘ws,wss’.
ACS_PLATFORM_IDENTIFIER N/A N/A Unique node identifier, where service is running. Must be provided by target platform, i.e. AWS, Open Stack etc. (Default ‘unknown’)
ACS_SERVICE_BUILD_NUMBER N/A N/A RPM version or any other version of the service (Default ‘unknown’)
ACS_ENVIRONMENT_IDENTIFIER N/A N/A Environment identifier is basically chef generalized identifier for any collection of nodes (Default ‘unknown’)
ACS_BUS_INITIAL_CONNECTION_ATTEMPTS com.avid.acs.bus.ConnectionOptions.initialConnectionAttempts
com.avid.acs.bus.BusAccessSettings.initialConnectionAttempts
N/A The # of times to attempt the initial connection before failing (-1 means to try forever)
ACS_GATEWAY_CONNECTION_LOST_THRESHOLD N/A N/A Amount of time, after which connection to Gateway considered as broken if we didn’t get ping from gateway. (Default 5000 ms)
ACS_BUS_RECONNECTION_ATTEMPTS com.avid.acs.bus.ConnectionOptions.reconnectionAttempts
com.avid.acs.bus.BusAccessSettings.reconnectionAttempts
N/A The # of times to attempt reconnecting a broken connection (-1 means to try forever)
ACS_BUS_RECONNECTION_DELAY com.avid.acs.bus.ConnectionOptions.busReconnectionDelay
com.avid.acs.bus.BusAccessSettings.busReconnectionDelay
N/A Delay (in ms) between reconnection attempts. Default is 1000 ms.
ACS_BUS_MAX_BACK_OFF_DURATION com.avid.acs.bus.ConnectionOptions.backOffReconnectionDuration
com.avid.acs.bus.BusAccessSettings.backOffReconnectionDuration
N/A Maximum duration of back-off re-connection attempts for unlicensed connection. (Default 600000 ms)
ACS_BUS_BACK_OFF_JITTER com.avid.acs.bus.ConnectionOptions.backOffJitter
com.avid.acs.bus.BusAccessSettings.backOffJitter
N/A Back-off reconnection jitter for unlicensed connection. (Default true)
ACS_BOOT_PREFERRED_CONFIGURATION_SOURCE com.avid.acs.bus.ConnectionOptions.busReconnectionDelay
com.avid.acs.bus.BusAccessSettings.busReconnectionDelay
preferredConfigurationSource The preferred configuration source (osgi or environment)
ACS_SECURITY_TRUST_SELF_SIGNED N/A N/A Whether to trust (true) or not trust (false) to self signed certificates. Default is true.
ACS_METRICS_ENABLED N/A N/A Whether or not metrics are enabled. By default metrics are enabled.
ACS_METRICS_TAG N/A N/A Metric tag used by metrics to make metric unique. Default value is “default”.
ACS_METRICS_REPORT_INTERVAL N/A N/A Defines metrics reporter interval in seconds (default 10)
ACS_BUS_COMMAND_TIMEOUT com.avid.acs.bus.ConnectionOptions.commandTimeout
com.avid.acs.bus.BusAccessSettings.commandTimeout
commandTimeout Default command timeout (ms). Deaults to -1 (ferever).
ACS_BUS_COMMAND_REDELIVERY_ATTEMPTS com.avid.acs.bus.ConnectionOptions.commandRedeliveryAttempts
com.avid.acs.bus.BusAccessSettings.commandRedeliveryAttempts
commandRedeliveryAttempts Default command redelivery attempts count. Defaults to 5.
ACS_BUS_COMMAND_ANY_COMPATIBLE_VERSION com.avid.acs.bus.ConnectionOptions.commandAnyCompatibleVersion
com.avid.acs.bus.BusAccessSettings.commandAnyCompatibleVersion
commandAnyCompatibleVersion Default command version compatibility mode. Defaults to true (any compatible version).
ACS_TERMINATE_PROCESS_IF_CONNECTION_TO_GATEWAY_FAILS N/A Terminate process with code 1 if connection to Secure Gateway failed after given number of initial connects or reconnects false
ACS_HTTP_SERVER_ENABLED N/A N/A Enables Http Server (now only Liveness Probe reporting functionality). Defaults to false.
ACS_HTTP_SERVER_LISTEN_HOST N/A N/A Ip address or host to bind HTTP server to. Defaults to 0.0.0.0 .
ACS_HTTP_SERVER_LISTEN_PORT N/A N/A Port for HTTP liveness server to listen to. Defaults to 9991.

Once the values have been set, connecting is the same:

In order to use OSGi as the configuration source, the preferred configuration source MUST be set to “osgi” (the default is “environment”). The OSGi configuration file must be named “com.avid.acs.service.IpcBusAccessFactory”.

If OSGi is not the preferred configuration source, then the following precedence is used: first system properties, then environment variables, then the hard-coded default values.

Overriding Default Settings with BusAccessSettings

To set the connection settings in code, use the ConnectionOptions class. The class prefers values set using the setters, then falls back to environment variable values, and finally falls back to the default values. The class can also be easily extended:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MySettings extends ConnectionOptions {
// ...

@Override
protected long getDefaultQueryTimeout() {
return 15000L;
}
}

// Create a new BusAccess, not connected to the bus
BusAccess bus = new IpcBusAccessFactory(new MySettings()).createBusAccess(null, false);
// then connect to the bus
bus.connect();

// OR

// Create a new BusAccess, connected to the bus immediately
BusAccess bus = new IpcBusAccessFactory(new MySettings()).createBusAccess(null, true);

Disconnect

At the end of service lifecycle BusAccess connection must be closed by calling disconnect method. Service developers must disconnect all BusAccess instances before terminating process. One of the options is to implement a shutdown hook:

1
2
3
4
5
6
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
bus.disconnect();
}
});

Also, if service catches any runtime exceptions in main thread and is going to terminate process as a result, it also must disconnect all BusAccess instances by calling disconnect method.

If you have service registered on BusAccess instance, you don’t need to call unregisterService before disconnect. disconnect will unregister service, unsubscribe all channels and close connection to Gateway.

Not disconnecting BusAccess instance will keep all created threads and connections alive. It will not allow JVM to terminate your process as a result.

Using OSGi

The Avid Connector API is exposed in OSGi containers via declarative services. The easiest way to get a reference to the bus in an OSGi container is to declare a reference to BusAccessFactory. The BusAccessFactory can then be used to get a shared BusAccess or to create a new instance of a BusAccess.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ...
import org.apache.felix.scr.annotations.*;

@Component
public class MyOSGiComponent {
@Reference(
bind = "bindBusAccessFactory",
unbind = "unbindBusAccessFactory")
private BusAccessFactory busAccessFactory;
private BusAccess bus;

synchronized void bindBusAccessFactory(BusAccessFactory busAccessFactory) throws BusAccessException, ServiceException {
bus = busAccessFactory.createBusAccess(new ClientAuthentication("620f8ca60b0e11e6b5123e1d05defe78", "2078884c3d6311e6ac619e71128cae77"), true);
}

synchronized void unbindBusAccessFactory(BusAccessFactory busAccessFactory) throws BusAccessException {
bus.disconnect();
bus = null;
}

// ...
}

NOTE: You must specify Avid Platform connection settings in the service properties or configure them via the OSGi container’s web console.

Connection Handler

If your application needs to react when a connection to the Avid Platform established/lost, pass in an implementation of a ConnectionListener interface.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyConnectionListener implements ConnectionListener {

@Override
public void onConnect(ConnectEvent event) {
// This method is called after the first connection to secure gateway is successfully established
// or after a lost connection is recovered, and all service/channel registrations are restored
}

@Override
public void onDisconnect(DisconnectEvent event) {
// This method is called when a connection to the Secure Gateway is lost
}
}

BusAccess bus = new IpcBusAccessFactory().createBusAccess(null, false);
bus.connect(new MyConnectionListener());

Debugging the Gateway Connection

The Avid Connector API and the gateway have an internal failover logic that validates the connection between them. If you are debugging your service in an IDE or on the command line, a breakpoint can block your service from receiving the information it needs to know that it is still connected to the Secure Gateway. To prevent the service from thinking it has lost its connection to the Secure Gateway, and thus failing over into reconnection mode, you should set the variable ACS_GATEWAY_CONNECTION_LOST_THRESHOLD=600000 (10 minutes). This should prevent the service from thinking it has lost its connection while giving you enough time to inspect the information you need at the breakpoint.

Threading Model

For processing incoming and sending outgoing messages Avid Connector API for Java is using multiple threads. It uses different thread pools for incoming and outgoing messages processing. Default thread pools implementation is Executors.newFixedThreadPool(32), separate for incoming and outgoing messages.

Incoming channel messages, service operation requests and incoming responses to query operation are processed by same thread pool for incoming messages.

Outgoing channel messages, service operation requests and outgoing service operation responses to query operation are processed by same thread bool for outgoing messages.

To provide own thread pool implementation you may use ConnectionOptions passed to BusAccess.connect method.

To set thread pools for incoming and outgoing messages:

1
2
3
4
5
ConnectionOptions connectionOptions = new ConnectionOptions();
// For incomming messages
connectionOptions.setRequestExecutorService(requestExecutorService);
// For outgoing messages
connectionOptions.setResponseExecutorService(responseExecutorService);

Using the Avid Connector API as a Client

The Avid Connector API can be used to send requests to services, and to subscribe and publish to channels. Requests and responses use the Message interface.

Providing Message Options

With each message operation (e.g. query, send, broadcast) you may provide message options. The following options are available:

  • timeout - Specifies a message timeout in ms for query operations. Default is 10000ms.
  • durable - Sets whether message is durable. Default is false. NOTE: This option is currently not implemented, and will be revised in the future releases.
  • anyCompatibleVersion - Sets whether the message is delivered to any compatible version of the service or to an exact version of the service. Default is true.
1
2
3
4
5
6
7
8
9
MessageOptions opt = new MessageOptions();
// Set timeout to 30000 ms
MessageOptions opt = new MessageOptions(30000);
// Set message to be durable
MessageOptions opt = new MessageOptions(true);
// Set timeout to 40000 ms and message to be durable
MessageOptions opt = new MessageOptions(40000, true);
// Set timeout to 50000 ms, message to be durable and delivered to exact service version
MessageOptions opt = new MessageOptions(50000, true, false);

Querying Services

NOTE: Old synchronous calls have been deprecated. You must design your programming logic to use asynchronous calls. Alternatively, you can use the queryAsync method, for synchronous calls, which will be revised in the future. Don’t use any of the API marked as deprecated. In addition, be sure to remove all deprecated calls from your existing.

Messages can be queried in an asynchronous and synchronous manner. Synchronous queries are in the process of being revised and should not be used. Synchronous calls use the Java Future interface and block the current thread when you call future.get(). Asynchronous queries do not block, but rather return a response to the handler whenever it is received.

To execute an asynchronous query:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
QueryAsyncCallback asyncCallback = new QueryAsyncCallback() {
@Override
public void onSuccess(Message result) {
if (result.errors().hasErrors()) {
System.out.println("Operation failed: " + result.errors());
else
System.out.println("Received Response: " + response);
}

@Override
public void onError(CallbackError error) {
System.out.println("Received error: " + error.getMessage());
System.out.println("Received error type: " + error.getType());
}

@Override
public void onTimeout() {
System.out.println("Timeout Occurred");
}
};

MutableMessage request = new MutableMessage("my.useful.service", "global", 1, "echo");
MessageOptions options = new MessageOptions(10000);
bus.query(request, options, asyncCallback);

The onError method is only called when there are CallbackError-type errors. The Message passed to onSuccess() will contain any errors that occurred while executing the service operation.

Sending to Services

Sending to a service is a one-way communication from the client to the service. There is no response. In comparison to a broadcast (described below), the client is also guaranteed that no more than one service instance will process the message.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Message is published");
}

@Override
public void onError(CallbackError error) {
System.out.println("Received error: " + error.getMessage());
System.out.println("Received error type: " + error.getType());
}
};

MutableMessage request = new MutableMessage("my.useful.service", "global", 1, "sendSomething");
request.parameters().put("someData", "someValue");
MessageOptions options = new MessageOptions(10000);
bus.send(request, options, asyncCallback);

Broadcasting to Services

Broadcasting to a service is a one-way communication from the client to all instances of a given service. There is no response. As opposed to a send, the message is processed by every available instance of the service.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Message result) {
System.out.println("Message is published");
}

@Override
public void onError(CallbackError error) {
System.out.println("Received error: " + error.getMessage());
System.out.println("Received error type: " + error.getType());
}
};

MutableMessage request = new MutableMessage("friend", "global", 1, "howdy");
bus.broadcast(request, null, asyncCallback);

Remote Zone and Multi-Zone Communications

The default behavior of the Avid Connector API is to communicate within its own local zone. All the examples provided above use this default behavior. If the local zone has been initialized in a multi-zone environment, however, it is possible communicate with services in other zones.

Zone-Specific Communications

To communicate with a specific remote zone, use the bus.zone(String zoneID) object. The following are examples of communications with other zones:

1
2
3
4
MutableMessage request = new MutableMessage("example.service", "global", 1, "doSomething");
Message response = bus.zone("5b2123f1-3f8e-4fcb-9263-f7b98bbdab0c").query(request, options, asyncCallback);
bus.zone("5b2123f1-3f8e-4fcb-9263-f7b98bbdab0c").send(request, options, asyncCallback);
bus.zone("5b2123f1-3f8e-4fcb-9263-f7b98bbdab0c").broadcast(request, options, asyncCallback);

In all the above cases, only services in the remote zone with an ID of 5b2123f1-3f8e-4fcb-9263-f7b98bbdab0c are invoked. In addition, only service instances that are registered with a scope of multi-zone are considered.

Multi-Zone Communications

To communicate across multiple zones, use the bus.multiZone() object. The following are examples of multi-zone communications:

1
2
3
4
MutableMessage request = new MutableMessage("example.service", "global", 1, "doSomething");
bus.multiZone().query(request, options, asyncCallback);
bus.multiZone().send(request, options, asyncCallback);
bus.multiZone().broadcast(request, options, asyncCallback);

Note that bus.multiZone().query and bus.multiZone().send only send to one service instance in one zone. If there is a service instance in the local zone, it sends to that one. Otherwise, it sends to a service instance in a remote zone (if one is available and registered with the multi-zone scope). This is particularly useful if you know the service is in a zone, but aren’t sure which one.

bus.multiZone().broadcast broadcasts to all matching service instances in all zones.

Local Zone Communications

Note that there is also a bus.localZone() object. Invoking query, send, and broadcast on this object is functionally equivalent to invoking the same methods on the base bus object.

Wildcards Usage in the Realm

If multiple realms of the same service are registered, you can use wildcards to address any service instance satisfying the wildcard expression. Wildcards can substitute any letters/digits between dots.

For example, consider an Avid Platform service registered with the following realms:

1
2
3
4
5
6
montreal.workgroup1;
montreal.workgroup2;
montreal.workgroup1.id1;
montreal.workgroup1.id2;
montreal.workgroup2.id1;
montreal.workgroup2.id2;

In the above case, you can address the services by supplying the following wildcarded realms in the request:

1
2
3
4
5
6
7
8
MutableMessage request = new MutableMessage("example.service", "*.*", 0, "echo");
MutableMessage request = new MutableMessage("example.service", "*.*.*", 0, "echo");
MutableMessage request = new MutableMessage("example.service", "montreal.*", 0, "echo");
MutableMessage request = new MutableMessage("example.service", "montreal.workgroup1.id*", 0, "echo");
MutableMessage request = new MutableMessage("example.service", "montreal.workgroup*.id2", 0, "echo");
MutableMessage request = new MutableMessage("example.service", "montreal.*group*.id2", 0, "echo");

// etc...

In contrast, the following wildcarded realms won’t match any instance in the above example:

1
2
3
4
5
*
*.*.*.*
montreal.workgroup*.id3

etc...

Proxy Clients

If you have the Java interface for the Avid Platform service you wish to call, the Avid Connector API provides a very convenient way for interacting with the service. Using Java proxy technology, your client can call the external service just as if it is a local object. Proxy clients can use the asynchronous API, if the service operation is declared as asynchronous (i.e. has an OperationContext parameter).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Given a CalculatorService interface that is annotated as a BusService...
CalculatorService proxyCalc = bus.getProxyClient(CalculatorService.class);
proxyCalc.add(1, 2, new ResponseContext<Long>() {
@Override
public void handleResponse(Long value, Message message) {
}

@Override
public void handleErrors(Set<BusError> errors, Message message) {
}

@Override
public void handleTimeout(long timeout, Message request) {
}
});

The default proxy uses the annotated service realm and version. In some cases, however, you may need to modify the service realm or service version that the client communicates with. This can done by getting the proxy as a BusClient:

1
2
3
4
CalculatorService proxyCalc = bus.getProxyClient(CalculatorService.class);
bus.asBusClient(proxyCalc).setServiceRealm("MIT");
bus.asBusClient(proxyCalc).setServiceVersion(2);
long sum = proxyCalc.add(2, 3, ...);

Remote Zones and Multi-Zone Proxy Clients

Proxy clients can also be created using the bus.zone(String zoneId) and bus.multiZone() objects. In these cases, the corresponding equivalents of the query method will be used.

Using the Avid Connector API to Host Services

You can use the Avid Connector API to host Java services on the Avid Platform.

Simple Services with Annotations

The easiest way to host a service on the bus is to annotate an interface or class with Avid Connector API annotations.

Annotations can be on an interface or an implementation class. One advantage of annotating an interface is that it can be used to create proxy clients as well.

For example, consider the following interface for a calculator service:

1
2
3
4
5
6
7
8
9
10
11
@BusService(type = "com.avid.acs.example.calc", realm = "global", version = 2, desc = "It calculates")
@AccessRights(external = AccessPolicy.NON_AUTHENTICATED)
public interface CalculatorService {
@Operation(name = "add", desc = "Adds two numbers")
@Result("sum")
void add(@Param("num1") @ParamEx("2") long num1, @Param("num2") @ParamEx("4") long num2, OperationContext<Long> operationContext);

@Operation(name = "subtract", desc = "Subtracts two numbers")
@Result("difference")
void subtract(@Param("num1") @ParamEx("10") long num1, @Param("num2") @ParamEx("5") long num2, OperationContext<Long> operationContext);
}

Given the above interface, an implementation is as simple as the following:

1
2
3
4
5
6
7
8
9
public class CalculatorServiceImpl implements CalculatorService {
public void add(long num1, long num2, OperationContext<Long> operationContext) {
operationContext.respond(num1 + num2);
}

public void subtract(long num1, long num2, OperationContext<Long> operationContext) {
operationContext.respond(num1 - num2);
}
}

Now that the service has been defined, we can register it on the bus:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback<ServiceInfo> asyncCallback = new AsyncCallback<ServiceContext>() {
@Override
public void onSuccess(ServiceContext result) {
String myId = ctx.getServiceInfo().getId();
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.registerService(new CalculatorServiceImpl(), options, asyncCallback);

The ServiceContext contains information about the service, including its ServiceInfo, the BusAccess it was registered with, the number of consumers per queue, and methods for subscribing to and posting service events. You can then query the contained ServiceInfo to find out the assigned ID, assigned realm (if applicable), and queues/topics.

Note that the compatibleVersions property of the @BusService annotation declares that the service with given version can handle requests targeted to other compatible versions. In this case, version 2 of the com.avid.acs.example.calc service can handle requests targeted to versions 1 and 0.

To pass enum as parameter to service, fromString static function must be declared in the enum, to convert string to appropriate enumeration value or either null or some default value if string does not match any available enumeration.

Enumeration declaration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public enum TrigonometryFunction {
SIN("sin"),
COS("cos"),
TAN("tan"),
COT("cot");

String val;

TrigonometryFunction(String val) {
this.val = val;
}

public static TrigonometryFunction fromString(String str) {
for (TrigonometryFunction value : TrigonometryFunction.values()) {
if (value.val.equals(str)) {
return value;
}
}

return null;
}
}

Service declaration which is using enumeration:

1
2
3
4
5
6
@BusService(type = "com.avid.acs.example.calc", realm = "global", version = 2, desc = "It calculates")
public interface CalculatorService {
@Operation(name = "trig", desc = "Calculates value of given trigonometry function")
@RestRequest(path = "calculator/trig")
void trig(@Param("func") TrigonometryFunction func, @Param("arg") double arg, OperationContext<Double> context);
}

You may also provide the whole ParamSet of the message as one POJO argument by annotating this argument as @ParamSet. Following examples are providing simple POJO for arithmetic operation, having num1 and num2 fields, and corresponding interface, which is using @ParamSet annotation.

1
2
3
4
5
6
7
8
9
10
11
12
13
public class OperationArgs {
@JsonProperty
public final long num1;
@JsonProperty
public final long num2;

@JsonCreator
public ServiceConf(@JsonProperty("num1") long num1,
@JsonProperty("num2") long num2) {
this.num1 = num1;
this.num2 = num2;
}
}
1
2
3
4
5
6
7
8
9
10
11
@BusService(type = "com.avid.acs.example.calc", realm = "global", version = 2, desc = "It calculates")
@AccessRights(external = AccessPolicy.NON_AUTHENTICATED)
public interface CalculatorService {
@Operation(name = "add", desc = "Adds two numbers")
@Result("sum")
void add(@ParamSet OperationArgs args, OperationContext<Long> operationContext);

@Operation(name = "subtract", desc = "Subtracts two numbers")
@Result("difference")
void subtract(@ParamSet OperationArgs args, OperationContext<Long> operationContext);
}

Register Service With Custom Options

You may configure following service options for service registration:

  • requestServiceConfiguration, whether to request service configuration from Service Manager or not. Default is true;
  • startSuspended, whether to start service in suspended mode or not. Default is false. Note, if service developer start service in suspended mode, it’s service developer responsibility to change service state to OK or any other appropriate according to service business logic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ServiceOptions options = new ServiceOptions();
options.setRequestServiceConfiguration(requestServiceConfiguration);
options.setStartSuspended(startSuspended);

AsyncCallback<ServiceInfo> asyncCallback = new AsyncCallback<ServiceContext>() {
@Override
public void onSuccess(ServiceContext result) {
String myId = ctx.getServiceInfo().getId();
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.registerService(new CalculatorServiceImpl(), options, asyncCallback);

For information how to subscribe for configuration notifications look Getting Configuration Information From a Service Manager

Structured Errors

Declaration of the structured errors in the service contract

A service must declare its complete list of possible error codes using Errors and Error annotations. The Errors annotation holds array of Error annotations. Each Error annotation provided information about error, which service may return. Example of errors definition:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@BusService(type = "avid.example.service", realm = "global", version = 3, desc = "Example service")
@Errors({
@Error(code = "ADDITION_OVERFLOW", severity = ErrorSeverity.ERROR, messageTemplate = "Addition overflow %{num1} and %{num2}", status = 500),
@Error(code = "DIVIDE_BY_ZERO", severity = ErrorSeverity.ERROR, messageTemplate = "Division by zero", status = 500)
})
public interface ExampleService {
@Operation(name = "add", desc = "Adds two numbers")
@Result("sum")
@RestRequest(path = "calculator/add")
void add(@Param("num1") @ParamEx("2") long num1, @Param("num2") @ParamEx("4") long num2, OperationContext<Long> operationContext);

@Operation(name = "divide", desc = "Divides two numbers")
@Result("quotient")
@RestRequest(path = "calculator/divide")
void divide(@Param("num1") @ParamEx("90") long num1, @Param("num2") @ParamEx("15") long num2, OperationContext<Long> operationContext);
}
  • code parameter is part of the service’s public API and therefore the format of the code should be a short sentence in uppercase with an underscore symbol used as a separator, e.g. BAD_REQUEST, BULK_DUPLICATION, MISSING_ARGUMENT, QUOTA_EXCEEDED.
  • status parameter is the appropriate corresponding HTTP error code.
  • messageTemplate parameter is text in en_US locale, that may include %{identifier} placeholders for error message parameters. The messageTemplate (and any parameters) should convey meaningful information e.g. “Quota on %{resourceName} exceeded for %{projectName}.”
  • severity parameter is the severity level of the error. Available severity levels are: ‘EMERGENCY’, ‘ALERT’, ‘CRITICAL’, ‘ERROR’, ‘WARNING’, ‘NOTICE’, ‘INFO’, ‘DEBUG’.

Composing response message with structured errors

To compose message with structure errors BusError class must be used. Also service must use Responder as one of the arguments of the service operation declaration. Using above example of error declaration, implementation of two methods, which produce ADDITION_OVERFLOW and DIVIDE_BY_ZERO errors may look following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ExampleServiceImpl implements ExampleService {
@Override
public void add(long num1, long num2, OperationContext<Long> operationContext) {
if (willAdditionOverflow(num1, num2)) {
Map<String, String> params = new HashMap<>();
params.put("num1", Long.toString(num1));
params.put("num2", Long.toString(num2));

operationContext.error(new BusError("ADDITION_OVERFLOW", params, "Addition overflow"));
} else {
System.out.println(String.format("add: num1=[%d] num2=[%d]", num1, num2));
operationContext.respond(num1 + num2);
}
}

@Override
public void divide(long num1, long num2, OperationContext<Long> operationContext) {
System.out.println(String.format("divide: num1=[%d] num2=[%d]", num1, num2));
if (num2 == 0) {
operationContext.error(new BusError("DIVIDE_BY_ZERO", (Map<String, String>) null, "Division by zero"));
return;
}

operationContext.respond(num1 / num2);
}

private static boolean willAdditionOverflow(long left, long right) {
if (right < 0 && right != Long.MIN_VALUE) {
return willSubtractionOverflow(left, -right);
} else {
return (~(left ^ right) & (left ^ (left + right))) < 0;
}
}

private static boolean willSubtractionOverflow(long left, long right) {
if (right < 0) {
return willAdditionOverflow(left, -right);
} else {
return ((left ^ right) & (left ^ (left - right))) < 0;
}
}
}

Multi-Zone Services

The default behavior of the Avid Connector API is to register services in the local zone scope. This means that by default services only receive requests from clients within the same zone. If the local zone is initialized in a multi-zone environment, however, it is possible to register a service in the multi-zone scope. This allows the service to be invoked by clients in any connected zone.

To register a service in the multi-zone scope, use the bus.multiZone() object:

bus.multiZone().registerService(new CalculatorServiceImpl(), options, asyncCallback);

Local Zone Scope

Registering a service using the bus.localZone() object is functionally equivalent to registering it using the base bus object. The service is only accessible within the local zone.

Advanced Services and Operations

Sometimes services are more complicated than the simple calculator example provided above, and operation arguments are more complicated than a long. In these cases, the Avid Connector API provides more advanced ways for constructing services.

Registering a Service Using ServiceInfo

When an annotated class is registered, it is converted into a ServiceInfo object for registration purposes. In some cases, service authors may want to construct their own ServiceInfo object or modify the ServiceInfo object that is generated by an annotated class. You might want to do this, for example, for a service whose exposed operations are determined dynamically.

1
2
3
4
5
6
7
8
// Create the ServiceInfo from the BusService type
ServiceInfo info = ServiceInfoFactory.createServiceInfo(myCalcService);

// Modify the ServiceInfo as needed (see API documentation)
info.getOperations().remove("subtract");

// And finally register it
bus.registerService(info, options, asyncCallback);

NOTE: When registering a ServiceInfo, the passed in ServiceInfo is never modified. Developers must inspect the ServiceInfo returned in the ServiceContext to see the full information of the registered service (assigned id, queues, topics, merged base operations, etc.).

Mixing Multiple Annotated Classes Into a Single Service

If you want to expose a single service comprising functionality in multiple annotated service classes, you could create your own wrapper service. A better solution, however, could be to mix the services together.

1
2
3
4
5
6
7
8
9
10
11
ServiceInfo info = ServiceInfoFactory.createServiceInfo(myAdditionService);
ServiceInfoFactory.mergeIntoServiceInfo(info, mySubstractionService);
ServiceInfoFactory.mergeIntoServiceInto(info, myMultiplicationService);
ServiceInfoFactory.mergeIntoServiceInto(info, myDivisionService);

// By default will use type, realm, and version from the first info, but we can override
info.setServiceType("calculator");
info.setServiceRealm("global");
info.setServiceVersion(1);

bus.registerService(info, options, asyncCallback);

NOTE: This functionality requires that all merged services have BusService annotations on the class.

Receiving or Returning the Entire Message Object in the Operation

Sometimes the service method may need to analyze the entire Message object, or need access to a parameter that is not easily deserialized into a Java type. In this case, the annotated method needs only one argument for the message (which should not be annotated):

1
2
3
@Operation(name = "add", desc = "Adds two numbers")
@Result("sum")
long add(Message request);

For similar reasons, a message may also wish to return the whole Message object, in which case the Result annotation is not needed:

1
2
@Operation(name = "add", desc = "Adds two numbers")
Message add(Message request);

Providing Complex Examples

Simple operations can provide example inputs via the ParamEx annotation. Less simple operations, however, may need to define their example in an external file. This can be accomplished via the Example and Examples annotations:

1
2
3
4
5
6
7
8
9
10
@Operation(name = "add", desc = "Adds two numbers")
@Example(name="Simple Addition", file="simple_add.json")
long add(@Param("num1") long num1, @Param("num2") long num2);

@Operation(name = "subtract", desc = "Subtracts two numbers")
@Examples({
@Example(name="Simple Subtraction", file="simple_sub.json"),
@Example(name="Advanced Subtraction", file="advanced_sub.json")
})
long subtract(@Param("num1") long num1, @Param("num2") long num2);

In the above examples, the files must be in the classpath, and must be referenced relative to the interface’s package. If traversing packages, use the ‘/‘ symbol as the delimiter (for example, “../../examples/simple_add.json”).

Platform Service Lifecycle Events

The Avid Connector API emits service events on services during registration, reconfiguration, shutdown, and request/response messages. All service events contain a reference to the ServiceContext.

Platform Service Lifecycle

The best way to describe the different events is to describe the service lifecycle and where the events fit in:

When the service is passed to the registerService method…

  1. ServiceInfo is generated for the service
  2. ServiceRegisteringEvent is posted
  3. Service is registering on the Secure Gateway
  4. ServiceRegisteredEvent is posted
  5. ServiceStartingEvent is posted
  6. Service is connected to the Secure Gateway
  7. ServiceStartedEvent is posted

When a message is posted to service…

  1. JSON byte stream message is converted to Avid Connector API Message
  2. ServiceReceivingMessageEvent is posted
  3. Corresponding service operation is invoked
  4. Service returns response
  5. ServiceRespondingMessageEvent is posted
  6. Avid Connector API Message is converted to JSON byte stream
  7. Message sent to Avid Platform

When the service’s configuration is changed…

  1. ServiceConfigurationEvent is posted with isReconfiguration flag set to true

When the service is shut down

  1. ServiceStoppingEvent is posted
    2 Service is disconnected from the Secure Gateway
  2. ServiceStoppedEvent is posted

Subscribing to Service Lifecycle Events

The Avid Connector API permits subscriptions to lifecycle events by annotating a method with @Subscribe. Subscription methods should accept a single argument: the event they wish to receive.

Registered services are automatically inspected for subscription methods and subscribed to their own events. A service might subscribe to its own events in order to receive configuration data, perform message pre-processing or post-processing, or inspect its registered ServiceInfo.

1
2
3
4
5
@Subscribe
public void onConfiguration(ServiceConfigurationEvent event) {
ServiceConfiguration config = event.getConfiguration();
// ...
}

Sometimes a different object (other than the registered service) may need to subscribe to a service’s events. Given a NoseyNeighbor class:

1
2
3
4
5
6
public class NoseyNeighbor {
@Subscribe
public void stickNoseInOtherPeoplesBusiness(ServiceEvent event) {
// do something...
}
}

The NoseyNeighbor subscribes to ServiceEvent. Since ServiceEvent is a superclass of all service events, this means that the NoseyNeighbor will receive every service event posted for the service (a nice shortcut from subscribing to them all individually).

The NoseyNeighbor can be subscribed at registration time via the ServiceOptions:

1
2
3
ServiceOptions options = new ServiceOptions();
options.setEventSubscribers(Arrays.asList((Object) new NoseyNeighbor()));
bus.registerService(new CalculatorServiceImpl(), options, asyncCallback);

Or the NoseyNeighbor can be subscribed at any point after registration via the ServiceContext:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback<ServiceInfo> asyncCallback = new AsyncCallback<ServiceContext>() {
@Override
public void onSuccess(ServiceContext result) {
result.subscribeToEvents(new NoseyNeighbor());
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.registerService(new CalculatorServiceImpl(), options, asyncCallback);

Providing Service Status

When a service is registered, it starts sending heartbeats to report its current status. The reported status contains a status code.

By default, as long as the Java process is still running, the Avid Connector API reports back an “OK” status. While this may be acceptable for simple services (like our calculator example), more complex services can explicitly supply their own status information. The Java Connector API supports 5 status statues (OK, WARNING, ERROR, SUSPENDED, OFFLINE). Each state has its own behavior which define whether the service is still visible on the platform, and if it is able to receive messages. See the table below to see the behavior defined by each status.

NOTE : The Java Connector API supported two additional states (STOPPED & UNKNOWN). These have been deprecated and should not be used in furture development.

Status Visible on Platform Receives Requests Example Use Case
OK Yes Yes Service is fully functional (default state)
WARNING Yes Yes Service is still fully functioning, but want to warn about some resource (ie. high memory useage, large db latency, many timeouts to another service)
ERROR Yes No Service is not functioning properly, needs some intervention to fix (ie. ran out of memory, but wish to keep the process alive for debugging)
SUSPENDED Yes No Service is ok, but cannot function properly due to an external resource (ie. DB connection lost and service cannot proceed without persisting data)
OFFLINE No No Service process should stay alive, but should not be visible or routed to (ie. Keep service process alive during DB migration)

To set the status for a service, you invoke setServiceStatus on theServiceContext.

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback<ServiceInfo> asyncCallback = new AsyncCallback<ServiceContext>() {
@Override
public void onSuccess(ServiceContext result) {
result.setServiceStatus(new ServiceStatus(StatusCode.WARNING, "Something getting worse"));
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.localZone().registerService(calculatorService, options, asyncContext);

Getting Configuration Information From a Service Manager

To intercept configuration provided by a Service Manager, the service must be subscribed to the ServiceConfigurationEvent event

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class ServiceConf {
@JsonProperty
public final String mongoHost;
@JsonProperty
public final int maxConnections;
@JsonProperty
public final boolean reconnect;
@JsonProperty
public final NestedConf nestedConf;

@JsonCreator
public ServiceConf(@JsonProperty("mongoHost") String mongoHost,
@JsonProperty("maxConnections") int maxConnections,
@JsonProperty("reconnect") boolean reconnect,
@JsonProperty("nestedConf") NestedConf nestedConf) {
this.mongoHost = mongoHost;
this.maxConnections = maxConnections;
this.reconnect = reconnect;
this.nestedConf = nestedConf;
}
}

class NestedConf {
@JsonProperty
public final boolean createDb;
@JsonProperty
public final boolean cleanOnDisconnect;

@JsonCreator
public NestedConf(@JsonProperty("createDb") boolean createDb,
@JsonProperty("cleanOnDisconnect") boolean cleanOnDisconnect) {
this.createDb = createDb;
this.cleanOnDisconnect = cleanOnDisconnect;
}
}

@Subscribe
public void onConfiguration(ServiceConfigurationEvent event) {
DataSet configurationDataSet = event.getConfiguration();
final ServiceConf serviceConf = configurationDataSet.asObject(ServiceConf.class);

if (event.isConfigurationUpdate()) {
// this is configuration update
} else {
// this is initial configuration when service registered
}

if (event.getError() != null) {
// some errors occurred
}

// update service status
event.getServiceContext().setServiceStatus(new ServiceStatus(StatusCode.OK, null), callback);
}

Service configuration in the Service Manager corresponding to given example:

1
2
3
4
5
6
7
8
9
"serviceConfiguration": {
"mongoHost": "my-mongo",
"maxConnections": 10,
"reconnect": true,
"nestedConf": {
"createDb": true,
"cleanOnDisconnect": false
}
}

Providing Custom Service Health Information

One of the core operations provided by the Avid Connector API is serviceHealth. By default if service receives this operation request it replies back with such message in resultSet:

1
2
3
4
5
6
7
{
"service": {
"instanceId": "955742c3-1fdd-4edd-be71-78aeb837aaa0",
"healthStatus": "ok",
"healthVerifier": "default"
}
}

Service developers may overwrite serviceHealth operation by providing specific healthStatus or additional customHealthInfo. In this case serviceHealth response may look like:

1
2
3
4
5
6
7
8
9
10
11
{
"service": {
"customHealthInfo": {
"str": "some string data",
"flag": true
},
"instanceId": "955742c3-1fdd-4edd-be71-78aeb837aaa0",
"healthStatus": "error",
"healthVerifier": "custom"
}
}

To overwrite serviceHealth operation, annotate method with @HealthCheck annotation and make it taking one argument of type HealthCheckReporter<YourCustomClass>:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@BusService(type = "avid.example.service", realm = "global", version = 3, desc = "Example service")
public class CalculatorService {
@HealthCheck
public void customHealthCheck(HealthCheckReporter<CustomHealthInfo> healthCheckReporter) {
CustomHealthInfo customHealthInfo = new CustomHealthInfo("some string data", true);
HealthStatus healthStatus = HealthStatus.ERROR;
healthCheckReporter.report(healthStatus, customHealthInfo);
}
}

public class CustomHealthInfo {
@JsonProperty
public final String str;

@JsonProperty
public final boolean flag;

@JsonCreator
public CustomHealthInfo(@JsonProperty("str") String str, @JsonProperty("flag") boolean flag) {
this.str = str;
this.flag = flag;
}
}

Providing Compatible Version

Services on the Avid platform provide the concept of ‘Compatible Versions’ to provide a way to remain backwards compatibility with older versions of your service. When you declare that a newer version of your service is compatible with previous versions, it gains the ability to receive messages which were sent to an older version. This way clients who are sending messages to an older version of your service, will get a valid response, even if there are no instances of the old service running. For example, if we declare a new version of our service to be version == 3, but we also declare that it is compatible with versions 2 & 1, then any message which is sent to version 1, 2, or 3 of your service will be routed to version 3 if there is no running instance of your service in the specified version. To declare your service as compatible with other versions we add a comaptibleVersions field as part of the BusService Annotation.

1
2
3
4
5
6
7
8
9
10
11
@BusService(type = "com.avid.acs.example.calc", realm = "global", version = 2, compatibleVersions = {1, 0}, desc = "It calculates")
@AccessRights(external = AccessPolicy.NON_AUTHENTICATED)
public interface CalculatorService {
@Operation(name = "add", desc = "Adds two numbers")
@Result("sum")
void add(@Param("num1") @ParamEx("2") long num1, @Param("num2") @ParamEx("4") long num2, OperationContext<Long> operationContext);

@Operation(name = "subtract", desc = "Subtracts two numbers")
@Result("difference")
void subtract(@Param("num1") @ParamEx("10") long num1, @Param("num2") @ParamEx("5") long num2, OperationContext<Long> operationContext);
}

Unregister Service

To unregister service on the Avid platform void unregisterService(@Nonnull String serviceId, AsyncCallback<Void> asyncCallback) method must be used.

1
2
3
4
5
6
7
8
9
10
11
busAccess.unregisterService(serviceInfo.getId(), new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Unegistered successfully");
}

@Override
public void onError(CallbackError error) {
System.out.println("Failed to unregister: " + error.getMessage());
}
});

Using the Avid Connector API for Channels

The Avid Connector API features channels, which are analogous to Java Service Message (JMS) Topics. When a message is posted to a channel, it is broadcast as a one-way communication to all the subscribers listening to that channel. It is important to note that channel messages are not persisted, and if you post to a channel that has no subscribers, it will not result in an error.

Channels are identified by their name. Subscribers interested in listening on a channel must either receive the channel name from the service that owns the channel, or use a predefined or well-known channel name.

Posting to a Channel

Channel messages are very similar to regular Avid Platform messages, but rather than have a paramSet or resultSet, they simply have a dataSet. The BusAccess API provides a method for posting to a channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
MutableChannelMessage channelMessage = new MutableChannelMessage(channelName, subject);
channelMessage.data().put("event", "ingest_started");
channelMessage.data().put("mobId", "060a2b340101010101010f0013-000000-475d870b9a6b24fb-060e2b347f7f-2a80");
// ...
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully posted message");
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.postToChannel(channelMessage, asyncCallback);

NOTE: Once a channel is registered, anyone knowing the channel name can post to it. Although the poster is often the creator of the channel, this is not enforced in any way.

Subscribing to a Channel

Objects that need to subscribe to channels can do so by subscribing through the BusAccess interface. First, a channel subscriber must implement the ChannelMessageHandler interface:

1
2
3
4
5
6
7
8
public class MyChannelMessageHandler implements ChannelMessageHandler {
@Override
public void onChannelMessage(ChannelContext context) {
// Do something with the channel message
// context.getChannelMessage()
// context.getBusAccess()
}
}

A ChannelMessageHandler can be subscribed to a channel using ‘subscribeToChannel()’ in ‘BusAccess’. Note that a single subscriber may subscribe to multiple channels at once. There is enough data in each of the subscriber methods for the subscriber to identify the channel to which the message pertains.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully subscribed");
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

ChannelMessageHandler myChannelMessageHandler = new MyChannelMessageHandler();

for (String channelName : interestingChannels)
bus.subscribeToChannel(channelName, bindings, myChannelMessageHandler, options, asyncCallback);

NOTE: bindins, options and asyncCallback can be null

Clients can also subscribe to a shared channel by shared name, channel name and bindings. In this case, when multiple instances are subscribed to the same channel with same shared name, only one instance will receive each message.

1
2
ChannelOptions options = new ChannelOptions("shared.name");
bus.subscribeToChannel(channelName, null, mySubscriber, options, asyncCallback);

Using Bindings

Bindings can be used to filter which messages are sent to subscribers. When subscribing to a channel, pass along a list of bindings to specify which messages are relevant to the subscriber:

1
2
3
4
5
List<String> myBindings = new ArrayList<String>();
myBindings.add("log.CalculatorService.*");
myBindings.add("log.*.info");
myBindings.add("log.*.warning");
bus.subscribeToChannel(loggingChannelName, myBindings, mySubscriber, null, asyncCallback);

The binding string will filter messages based on their dot-separated subject. The ‘*‘ character can be used as a wildcard to match entire words. For example, “com.*.info” matches “com.MapService.info” but “com.Map*.info” does not.

Unsubscribing from a Channel

To unsubscribe your subscriber from a specific channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully unsubscribed");
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.unsubscribeFromChannel(loggingChannelName, mySubscriber, asyncCallback);

To unsubscribe your subscriber from all channels:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully unsubscribed");
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.unsubscribeFromChannels(mySubscriber, asyncCallback);

Unsubscribing from Bindings

To unsubscribe your subscriber from specific bindings of named channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully unsubscribed");
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.unsubscribeFromBindings(loggingChannelName, listOfBindings, mySubscriber, asyncCallback);

Remote Zone and Multi-Zone Channel Communications

The default behavior of the Avid Connector API is to scope channel communications within the local zone. All the examples given above use this default behavior. If the local zone has been initialized in a multi-zone environment, however, it is possible to communicate using channels across multiple zones.

Interacting with Channel Subscribers in a Specific Remote Zone

To communicate with channel subscribers in a specific remote zone, use the bus.zone(String zoneID) object. The following examples send channel events and messages to users in a specific zone only:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully posted");
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.zone("5b2123f1-3f8e-4fcb-9263-f7b98bbdab0c").postToChannel(channelMessage, asyncCallback);

In the above example, only multi-zone subscribers to “my.channel” in the remote zone with ID 5b2123f1-3f8e-4fcb-9263-f7b98bbdab0c receive these channel events and messages. If there are local scope subscribers listening on the same channel in that zone, they do not receive these messages (since they only receive messages sourced from their local zone).

Interacting with Channel Subscribers in All Zones

To communicate with channel subscribers in all connected zones, use the bus.multiZone() object. The following are examples of multi-zone channel communication:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully posted");
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.multiZone().postToChannel(channelMessage, asyncCallback);

In the above examples, all multi-zone subscribers to “my.channel” across all connected zones receive the channel events and messages. The local scope subscribers in the poster’s zone also receive the messages. The local scope subscribers in remote zones, however, do not receive these messages.

Subscribing to Multi-Zone Channels

If the local zone has been initialized in a multi-zone environment, channel subscribers can listen using the multi-zone scope. This means that they can receive channel messages from posters in remote zones.

To subscribe to a channel in the multi-zone scope, use the bus.multiZone() object:

1
2
3
4
5
6
7
8
9
10
11
12
13
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully subscribed");
}

@Override
public void onError(CallbackError error) {
System.out.println("Error occurred: " + error.getMessage());
}
};

bus.multiZone().subscribeToChannel(channelName, binding, mySubscriber, options, asyncCallback);

Local Zone Communications

Note that there are also channel methods in the bus.localZone() object. Invoking channel methods on this object is functionally equivalent to invoking the same methods on the base bus object.

Accessing Constants

The Avid Connector API provides constants, which represent core Avid Platform service types (e.g. registry, federation and attributes). To access these constants:

1
import com.avid.acs.bus.constants.CoreServices;

Exposing Operations for Upstream REST requests

To declare that an operation supports a REST request, apply the RestRequest annotation to the corresponding operation:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Operation
@RestRequest(path = "my/service/{resource}", bodyParam = "myResource", resultParam = "myResult", method = RequestMethod.POST)
@Result("myResult")
int saveResource(@Param("resource") String resource, @Param("myResource") MyResource myResource) throws BusAccessException;

@Operation
@RestRequest(path = "my/service/{resource}", queryParams = { "param1", "param2" })
@Result("myResource")
MyResource getResource(@Param("resource") String resource, @Param("param1") String param1, @Param("param2") String param2) throws BusAccessException;

@Operation
@RestRequest(path = "my/service/{resource}", method = RequestMethod.DELETE)
void deleteResource(@Param("resource") String resource) throws BusAccessException;

The RestRequest annotation has the following parameters:

  • path - Required;
  • queryParams - Not required;
  • bodyParam - Not required, default *;
  • method - Not required, default GET;

IMPORTANT: Service can not expose multiple operations or/and commands with the same HTTP method on the same endpoint.

1
2
3
4
5
6
7
8
@Operation
@RestRequest(path = "my/service/{resource}", bodyParam = "myResource", resultParam = "myResult", method = RequestMethod.POST)
@Result("myResult")
int saveResource(@Param("resource") String resource, @Param("myResource") MyResource myResource) throws BusAccessException;

@Command
@RestRequest(path = "my/service/{resource}", method = RequestMethod.POST)
void copyResource(CommandContext<MyResource> context) throws BusAccessException;

Example above is invalid, because operation and command are both exposed using the same path and RequestMethod. During service registration
Avid Connector API will detect conflicting REST endpoints, if any, it will throw InvalidArgumentException.

For more detailed information about how REST requests are mapped and delivered to your service,
please view the upstream HTTP docs.

Metrics

The Avid Connector API exposes internal metrics like operation duration, message size and many more. The metrics
are automatically send to the platform. The platform can be configured to expose the metrics into different
backends like graphite or CloudWatch.

Metrics Example

In some cases it is not enough to know the duration of a operation and a service developer feels the need to
measure specific parts of her business logic. The Avid Connector API exposes for that reason a MetricsFacade.
The MetricFacade is bound to connection in order to associate metrics with a specific service. Here is a little example
which measures a database call:

1
2
3
4
5
6
7
BusAccess bus = // get connection...
MetricsFacade metrics = bus.getMetrics();
Timer timer = metrics.timer("iam.mongo.query.findPrinciple");
Timer.Context watch = timer.start()
mongo.doMyQuery()
watch.stop()

The code above exposes automatically a new metric called “iam.mongo.query.findPrinciple”. The Avid Connector API sends the metric automatically to the platform.

The MetricsFacade wraps currently a well known library called dropwizard metrics and exposes all mayor metric types. We discourage to use the “dropwizard metrics” library directly since it might be replaced by some other library in the future.

Metrics Names

  • Prefix metric names to prevent possible clashes with core metrics (.eg. iam, asset)
  • Dots are path in target systems like Graphite. Try to design you metric names in order to make it easy to capture e.g. all mongo queries with a wild card.
    • Good: iam.mongo.query.findPrinciple, iam.mongo.query.deletePrincipleByName
    • Bad: iam.mongo.query.find.principle, iam.mongo.query.delete.principle.by.name
  • Do not use unbound values like UUID in metric names. Metrics with unbound names are expensive to aggregate and to store.

Metrics Tags

In most cases you don’t have to worry about the metrics tag but if you have a deployment where you must run multiple instances of the same service on the same host machine you must make sure that metrics of each instance are unique by defining the ACS_METRICS_TAG environment variable.

Here is a naming example if you have to run 3 instances of the transcode service on the same host:

  • transcode-1
  • transcode-2
  • transcode-3

Liveness probe

The Avid Connector API has built in functionality to collect and display liveness status of registered BusAccess instances. As well as display these instances together with registered services.
This functionality is available through the endpoints exposed by internally started HTTP server.
Liveness status functionality is disabled by default and can be enabled with environmental variable. Refer to environmental variables list.

The server instance is Singleton, and is configured and started at the first invocation of BusAccess.connect method.
All other connections will use the exactly the same server instance created during the first connection.

Liveness probe is not supported and not available for deprecated methods of BusAccess (such as deprecated connect, disconnect, registerService, unregisterService).
Liveness probe server uses only ConnectionOptions and environmental variables for its configuration.

Default liveness probe implementation

The default implementation of Liveness Probe server is available and works out of the box when liveness probe functionality is enabled via environmental variables.
Default bind address is 0.0.0.0 and port 9991.
Default endpoints server exposes :

  • /api/v1/liveness (GET) - returns aggregated health of all registered services and clients
  • /api/v1/instances (GET) - returns list of all registered services and clients
  • /api/v1/instances/{id}/liveness (GET) - returns health of specific instance

/api/v1/liveness Returns text/plain with Status code 200/500 and text OK/ERROR indicating that aggregated status of all instances is healthy or not correspondingly

/api/v1/instances/{id}/liveness Returns text/plain with Status code 200/500 and text OK/ERROR indicating that BusAccess instance is healthy or not correspondingly

  • id - instance id of the BusAccess instance

/api/v1/instances Returns application/json containing description of all registered BusAccess instances and services assigned to them

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

{
"fee67ab5-2504-4e50-8ab8-f534ee86c4fb": {
"clientId": "N/A",
"livenessCode": "ERROR",
"serviceType": "avid.platform.example.liveness-test",
"serviceRealm": "global",
"serviceVersion": 3
},
"c78c1576-2ba4-41af-9b11-a21434549e93": {
"clientId": "123-456-789",
"livenessCode": "OK",
"serviceType": "avid.platform.example.liveness-test",
"serviceRealm": "global",
"serviceVersion": 3
},
"7a9e24ad-a71e-4f91-84d0-9ab95479ed03": {
"clientId": "222-456-789",
"livenessCode": "OK",
"serviceType": "avid.platform.example.liveness-test",
"serviceRealm": "global",
"serviceVersion": 3
}
}
```

Where clientId is clientID used in authentication. If client authenticated by IP, not by client ID, then clientId is "N/A".

__call to any other endpoint__ Returns _text/plain_ with Status code 404 and text "404 Not Found"



## Custom liveness probe implementation

### Custom InstanceLivenessProvider and AggregatedLivenessProvider
The Avid Connector API provides a possibility to re-define AggregatedLivenessProvider and InstanceLivenessProvider, which the server will use to report liveness statuses.

#### AggregatedLivenessProvider
Interface may be implemented, and for the Avid Connector to use it its instance should be set to ConnectionOptions, before the first connection to gateway happened.
```java

options.setAggregatedLivenessProvider(customAggregatedLivenessProvider);

Aggregation provider is set only once, during the server startup, all the other attempts to re-configure it will be ignored.

AggregatedLivenessProvider has only one method, that actually returns status and accepts list of InstanceLivenessStatus returned by individual InstanceLivenessProvider.
The method returns instance of the pre-defined class AggregatedLivenessStatus, that actually holds the LivenessCode that will be returned by the server as the result of the call to /api/v1/liveness endpoint.

InstanceLivenessProvider

Interface may be implemented and provided to Avid Connector through ConnectionOptions.

1
2
3

options.setInstanceLivenessProvider(customLivenessProvider);

Instance of InstanceLivenessProvider is unique per BusAccess instance and will be set for each instance when it connects to Bus.
AggregatedLivenessProvider aggrerates statuses returned by InstanceLivenessProvider.
InstanceLivenessProvider contains one method that accepts no arguments and returns instance of InstanceLivenessStatus which holds the actual LivenessCode of the BusAccess instance and is returned by /api/v1/instances/{id}/liveness.

Custom BusAccessWebServer

The Avid Connector API allows to provide your own implementation of HTTP liveness server, using BusAccessWebServer interface.

1
2
3
4
5
6
7
8
9
10
11

public interface BusAccessWebServer {
void startServer();
void stopServer();
void registerInstance(String instanceId, InstanceLivenessProvider instanceLivenessProvider);
void unregisterInstance(String instanceId);
void registerService(String instanceId, String serviceId, ServiceSignature signature);
void unregisterService(String instanceId, String serviceId);
void registerAggregatedLivenessProvider(AggregatedLivenessProvider aggregatedLivenessProvider);
}

For the Avid Connect API to accept it, it should be set in ConnectionOptions before first instance connect (or the default one will be initialized)

1
2
options.setBusAccessWebServer(customLivenessServer);

In this case implementation is entirely up to you, as well as thirdparty libs used for it.
The Avid Connector API though will still wrap your server as Singleton, startServer() on first connect and stopServer() when no active connections left.

It will still call (providing the default implementation for AggregatedLivenessProvider and InstanceLivenessProvider if no custom is available)

  • server.registerAggregatedLivenessProvider(AggregatedLivenessProvider aggregatedLivenessProvider) for the first call to not deprecated BusAccess.connect
  • server.registerInstance(String instanceId, InstanceLivenessProvider instanceLivenessProvider) for each call to not deprecated BusAccess.connect,
  • server.unregisterInstance(String instanceId) for each call to not deprecated BusAccess.disconnect
  • server.registerService(String instanceId, String serviceId, ServiceSignature signature) for each call to not deprecated BusAccess.registerService
  • server.unregisterService(String instanceId, String serviceId) for each call to not deprecated BusAccess.unregisterService

Use this design doc if you need more information on how to implement server instance.

Old And Deprecated API

  • Responder interface is deprecated, must be replaced by OperationContext. OperationContext provides same method plus getMessage and getBusAccess. All nested calls through the BusAccess interface must be performed with getBusAccess instance returned by OperationContext.

Migration Guides

This migration guide is valid for migrating to Avid Connector API version >= 3.8.

Migration to Asynchronous API

  • Sending requests to service
    • query
      • all BusAccessClient.query(...) requests must be replaced with void query(@Nonnull Message msg, MessageOptions options, QueryAsyncCallback asyncCallback) throws BusAccessException
      • implement QueryAsyncCallback interface, to handle query response
      • if your service needs synchronous query, use new FutureQueryAsyncCallback() as asyncCallback argument. To wait for result synchronously useFutureQueryAsyncCallback.get(...) method. NOTE: synchronous calls will decrease performance of your application, consider using of synchronous calls only if it is justified by business logic of your application
    • send
      • all BusAccessClient.send(...) requests must be replaced with void send(@Nonnull Message msg, MessageOptions options, AsyncCallback<Void> asyncCallback) throws BusAccessException
    • broadcast
      • all BusAccessClient.broadcast(...) requests must be replaced with void broadcast(@Nonnull Message msg, MessageOptions options, AsyncCallback<Void> asyncCallback) throws BusAccessException
    • if you don’t need to provide options, pass null
  • Channels communication
    • post to channel
      • all BusAccessClient.postToChannel(...) requests must be replaced with void postToChannel(@Nonnull ChannelMessage msg, AsyncCallback<Void> asyncCallback) throws BusAccessException;
    • subscribe to channel
      • all BusAccessHost.subscribeToChannel(...) operations must be replaced with void subscribeToChannel(@Nonnull String channelName, @Nullable List<String> bindings, @Nonnull ChannelMessageHandler handler, @Nullable ChannelOptions options, AsyncCallback<Void> asyncCallback) throws BusAccessException
    • unsubscribe from channels
      • all BusAccessHost.unsubscribeFromChannels(...) operation must be replaced with void unsubscribeFromChannels(@Nonnull ChannelMessageHandler handler, AsyncCallback<Void> asyncCallback) throws BusAccessException;
    • unsubscribe from channel
      • all BusAccessHost.unsubscribeFromChannel(...) operation must be replaced with void unsubscribeFromChannel(@Nonnull String channelName, @Nonnull ChannelMessageHandler handler, AsyncCallback<Void> asyncCallback) throws BusAccessException;
  • Other methods which must be replaced with asynchronous versions
    • getDefaultVersion;
    • getEarliestVersion;
    • getLatestVersion;
    • isServiceAvailable;
    • registerService;
    • unregisterService;
  • if your service needs to process callback synchronously, use new FutureAsyncCallback() as asyncCallback argument. To wait for result synchronously useFutureQueryAsyncCallback.get(...) method. NOTE: synchronous calls will decrease performance of your application, consider using of synchronous calls only if it is justified by business logic of your application
  • if you don’t need to handle callback, pass null value

Service operations migration to OperationContext

Your service operation might be declared similarly to the following examples:

1
2
3
4
int add(@Param("num1") long num1, @Param("num2") long num2);
int add(@Param("num1") long num1, @Param("num2") long num2, Message message);
void add(@Param("num1") long num1, @Param("num2") long num2, Responder<Long> responder, Message message);
void add(@Param("num1") long num1, @Param("num2") long num2, Responder<Long> responder);

In all these cases it should be migrated to the new signature:

1
void add(@Param("num1") long num1, @Param("num2") long num2, OperationContext<Long> context);

NOTE: The following rules must be used

  • to access incoming messages use context.getMessage(), so you don’t need Message argument anymore to get incoming message
  • to get instance of BusAccess and perform nested calls with BusAccess use context.getBusAccess()
  • to reply back with result of given type use context.respond(T result) or context.respond(T result, MessageContext context) if you need to reply back with result and custom context
  • to reply back with error use error(BusError busError) or error(Set<BusError> busErrors) if you need to reply back with list of errors
  • do not use BusAccess instance you have created for the nested calls. Always use BusAccess from context for the nested calls

Channel subscription migration to ChannelMessageHandler

Instead of ChannelSubscriber interface ChannelMessageHandler must be used to receive incoming messages. Example of implementation:

1
2
3
4
5
6
7
8
public class MyChannelMessageHandler implements ChannelMessageHandler {
@Override
public void onChannelMessage(ChannelContext context) {
// Do something with the channel message
// context.getChannelMessage()
// context.getBusAccess()
}
}

NOTE: The following rules must be used

  • to access incoming messages use context.getChannelMessage()
  • to get instance of BusAccess and perform nested calls with BusAccess use context.getBusAccess()
  • do not use BusAccess instance you have created for the nested calls. Always use BusAccess from context for the nested calls

Flink Integration

To consume messages from Avid Platform with Flink, com.avid.acs.bus.flink.AvidConnectorSource must be used. To create instance the following parameters required:

  • host - Secure Gateway host;
  • port - Secure Gateway encrypted port (NOTE: Secure Gateway supports not-encrypted communication which is not default behaviour, and turned on manually. Apache Flink connector doesn’t support connect to not-encrypted port);
  • channel - Avid Platform channel name, connector will subscribe to this channel for incoming messages;
  • bindings - list of bindings, connector will use for subscription to given channel for filtering incoming messages;

Constructor looks like

1
public AvidConnectorSource(String host, int port, String channel, List<String> bindings)

To create instance

1
new AvidConnectorSource("gateway-host", 9900, "my.log.channel", Arrays.asList("warn.*", "error.*"));

To send messages to Avid Platform with Flink, com.avid.acs.bus.flink.AvidConnectorSink must be used. To create instance the following parameters required:

  • host - Secure Gateway host;
  • port - Secure Gateway encrypted port (NOTE: Secure Gateway supports not-encrypted communication which is not default behaviour, and turned on manually. Apache Flink connector doesn’t support connect to not-encrypted port);
  • channel - Avid Platform channel name, connector will post messages to this channel;
  • subject - message subject, connector will use for routing message over RabbitMQ, so consumers may use it for filtering, to subscribe for messages of interest (i.e. service may post some messages with subject “warn.something” and “error.something”, so consumer may use “warn.*” as a binding to receive only messages with this subject);

Constructor looks like

1
public AvidConnectorSink(String host, int port, String channel, String subject)

To create instance

1
new AvidConnectorSink("gateway-host", 9900, "my.log.channel", "warn.something");

Here is example application using Flink connector. It takes six arguments from command line:

  • gatewayHost - Secure Gateway host;
  • gatewayPort - Secure Gateway port;
  • sourceChannelToListen - channel name to listen for messages;
  • sourceChannelBindings - bindings to filter incoming messages from source channel, it is comma-separated list without spaces;
  • targetChanel - channel name where to publish outgoing messages;
  • targetSubject - subject for the outgoing messages;

Given example is receiving messages from provided channel with given bindings, checks if message has zone.id filed in the JSON body, if it has it publish this zone id to the target channel with given subject.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package org.avid.quickstart;

import com.avid.acs.bus.channel.ChannelMessage;
import com.avid.acs.bus.channel.MutableChannelMessage;
import com.avid.acs.bus.flink.AvidConnectorSink;
import com.avid.acs.bus.flink.AvidConnectorSource;
import com.avid.acs.bus.message.data.Data;
import com.avid.acs.bus.message.data.JsonData;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Date;
import java.util.List;

public class AvidConnectorConsumer {
private static final Logger LOG = LoggerFactory.getLogger(AvidConnectorConsumer.class);

public static void main(String[] args) throws Exception {

if (args.length != 6) {
System.out.println("Arguments: gatewayHost gatewayPort sourceChannelToListen sourceChannelBindings targetChanel targetSubject");
System.out.println("Bindings should be provided as comma-separated list without spaces, i.e. bus.updates,bus.zone.updates");
System.out.println("Usage: flink run -c org.avid.quickstart.AvidConnectorConsumer quickstart.jar gatewayHost gatewayPort channelToListen channelBindings");
System.exit(1);
}

String gatewayHost = args[0];
int gatewayPort = Integer.parseInt(args[1]);
String channel = args[2];
List<String> bindings = Arrays.asList(args[3].split(","));
String targetChannel = args[4];
String targetSubject = args[5];

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<ChannelMessage> data = env.addSource(new AvidConnectorSource(gatewayHost, gatewayPort, channel, bindings));

data = data.map((MapFunction<ChannelMessage, ChannelMessage>) value -> {
ChannelMessage message = new MutableChannelMessage(targetChannel, targetSubject);
Data zone = value.data().get("zone");
if (zone instanceof JsonData) {
String id = ((JsonData) zone).get().get("id").asText();
message.data().put("stamp", new Date().getTime());
message.data().put("zone", id);
LOG.info("Zone is: {}", id);
}

return message;
});
data.print();
data.addSink(new AvidConnectorSink(gatewayHost, gatewayPort, targetChannel, targetSubject));

// execute program
env.execute("Avid Connector API channel messages consumer");
}
}