Overview

All communication over the Avid Platform must be performed in non-blocking, asynchronous way. For this purpose AMQP protocol is used. Avid Connector API provides abstraction layer to develop micro-services for the Avid Platform, so service developers can concentrate on the business logic of the services.

The following diagram shows in a simple way place of the Avid Connector API in the Avid Platform environment

All messages sent over AMQP protocol are in JSON format. Here are examples of the Avid Platform messages

1
2
3
4
5
6
7
8
9
10
{
"serviceType": "avid.acs.calculator",
"serviceRealm": "global.test",
"serviceVersion": 3,
"op": "add",
"paramSet": {
"num1": 5,
"num2": 3
}
}
1
2
3
4
5
6
7
{
"channel": "avid.acs.calculator",
"subject": "notification.cal.done",
"paramSet": {
"result": 5
}
}

Communication between Avid Connector API and Secure Gateway is using websockets with Google Protobuf payload.

Background on Event Driven Architecture

Avid Connector API provides both RPC and event driven communication patterns

Event-driven architecture
Remote procedure call

Common Patterns

The following patterns can be used with Avid Connector API

Query

Client should use query operation to send request to target service and receive response back. When sending request, timeout is provided for waiting for response. By default it is 10 sec. Client will receive response if success, timeout if response wasn’t received within requested time or error if error occurred.

Examples

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
QueryAsyncCallback asyncCallback = new QueryAsyncCallback() {
@Override
public void onSuccess(Message result) {
if (result.errors().hasErrors()) {
System.out.println("Operation failed: " + result.errors());
else
System.out.println("Received Response: " + result.results().asJson().toString());
}

@Override
public void onError(CallbackError error) {
System.out.println("Received error: " + error.getMessage());
System.out.println("Received error type: " + error.getType());
}

@Override
public void onTimeout() {
System.out.println("Timeout Occurred");
}
};

MutableMessage request = new MutableMessage("avid.acs.calculator", "global", 3, "add");
request.parameters().put("num1", 40);
request.parameters().put("num2", 2);

MessageOptions options = new MessageOptions(5000);
bus.query(request, options, asyncCallback);

Node.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
var m = {
serviceType: 'avid.acs.calculator',
serviceRealm: 'global',
serviceVersion: 3,
op: 'add',
'paramSet' : {
'num1': 40,
'num2': 2
}
}

var opt = {
timeout: 5000
}

access.advancedQuery(m, opt, function(reply) {
if (reply.errorSet) {
console.error("Operation failed: ", JSON.stringify(reply.errorSet));
} else {
console.log("The meaning of life is ", JSON.stringify(reply.resultSet));
}
}, function() {
console.log("Timeout Occurred");
}, function(err) {
if (err) {
console.err("Received error: " + err.errorMessage);
console.err("Received error type: " + err.errorType);
}
});

.NET

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
var msg = new Message("avid.acs.calculator", "global.test", 3, "add");
msg.Parameters.Add("num1", 40);
msg.Parameters.Add("num2", 2);


var options = new MessageOptions { Timeout = 5000 };
bus.Query(msg, new AnonAsyncQueryResult() {
onSuccess = (Message result) => {
if (result.HasErrors) {
Console.WriteLine("Operation failed: " + result.Errors);
} else {
Console.WriteLine("Received Response: " + message.Results);
}
},
onError = (OperationError error) => {
Console.WriteLine("Received error: " + error.Message);
Console.WriteLine("Received error type: " + error.Type);
},
onTimeout = () => {
Console.WriteLine("Timeout Occurred");
}
}, options);

C++

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class CalculatorResult
: public acs::ReferenceCounter<CalculatorResult>
, public acs::AsyncQueryResult
{
public:
typedef boost::intrusive_ptr<CalculatorResult> pointer;

virtual void ACSBUS_CALL onSuccess(acs::BusMessage* response) {
if (response->getErrors()->size())
std::cout << "Operation failed: " << response->getErrors()->at(0) << std::endl;
else
std::cout << "Received response: " << ACSVariant_cast<int>(response->getResults()->at(0)->value()) << std::endl;
}

virtual void ACSBUS_CALL onError(const acs::OperationError* error) {
std::cout << "Received error: " << error->message() << std::endl;
std::cout << "Received error type: " << error->type() << std::endl;
}

virtual void ACSBUS_CALL onTimeout() {
std::cout << "Timeout occurred" << std::endl;
}

virtual void ACSBUS_CALL addRef() const { intrusive_ptr_add_ref(this); }

virtual void ACSBUS_CALL release() const { intrusive_ptr_release(this); }
};

// ...

acs::BusMessageSP request = acs::CreateMessage("avid.acs.calculator", "global.test", 3, "add");
request->getParameters()->addParameter("num1", acs::Variant(40));
request->getParameters()->addParameter("num2", acs::Variant(2));

CalculatorResult::pointer addResult(new CalculatorResult());
acs::MessageOptionsSP messageOptions = acs::CreateMessageOptions(5000);
bus->query(request.get(), addResult.get(), messageOptions.get());

Send

Client should use send operation to send request to target service without receiving response. This operation can be used when client doesn’t care about operation response.

Examples

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
AsyncCallback asyncCallback = new AsyncCallback() {
@Override
public void onSuccess(Void result) {
System.out.println("Message is published");
}

@Override
public void onError(CallbackError error) {
System.out.println("Received error: " + error.getMessage());
System.out.println("Received error type: " + error.getType());
}
};

MutableMessage request = new MutableMessage("avid.acs.calculator", "global", 3, "add");
request.parameters().put("num1", 40);
request.parameters().put("num2", 2);

bus.send(request, null, asyncCallback);

Node.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var m = {
serviceType: 'avid.acs.calculator',
serviceRealm: 'global',
serviceVersion: 3,
op: 'add',
'paramSet' : {
'num1': 40,
'num2': 2
}
}

var callback = function(err) {
if (err) {
console.err("Received error: " + err.errorMessage);
console.err("Received error type: " + err.errorType);
} else {
console.log("Message is published");
}
};
access.send(m, null, callback);

.NET

1
2
3
4
5
6
7
8
9
10
11
12
13
var msg = new Message("avid.acs.calculator", "global.test", 3, "add");
msg.Parameters.Add("num1", 40);
msg.Parameters.Add("num2", 2);

bus.Send(msg, new AnonAsyncOpResult() {
onSuccess = (object o) => {
Console.WriteLine("Message is published");
}
onError = (OperationError error) => {
Console.WriteLine("Received error: " + error.Message);
Console.WriteLine("Received error type: " + error.Type);
}
});

C++

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class CalculatorPublished
: public acs::ReferenceCounter<CalculatorPublished>
, public acs::AsyncOpResult
{
public:
typedef boost::intrusive_ptr<CalculatorPublished> pointer;

virtual void ACSBUS_CALL onSuccess() {
std::cout << "Message is published" << std::endl;
}

virtual void ACSBUS_CALL onError(const acs::OperationError* error) {
std::cout << "Received error: " << error->message() << std::endl;
std::cout << "Received error type: " << error->type() << std::endl;
}

virtual void ACSBUS_CALL addRef() const { intrusive_ptr_add_ref(this); }

virtual void ACSBUS_CALL release() const { intrusive_ptr_release(this); }
};

// ...

acs::BusMessageSP msg = acs::CreateMessage("avid.acs.calculator", "global.test", 3, "add");
msg->getParameters()->addParameter("num1", acs::Variant(40));
msg->getParameters()->addParameter("num2", acs::Variant(2));

CalculatorPublished::pointer addPublished(new CalculatorPublished());
bus->send(msg.get(), addPublished.get());

Broadcast

Client should use broadcast operation to send request to all available instances of the same service without receiving response. This operation can be used when client doesn’t care about operation response.

Examples

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
AsyncCallback asyncCallback = new AsyncCallback() {
@Override
public void onSuccess(Void result) {
System.out.println("Message is published");
}

@Override
public void onError(CallbackError error) {
System.out.println("Received error: " + error.getMessage());
System.out.println("Received error type: " + error.getType());
}
};

MutableMessage request = new MutableMessage("avid.acs.calculator", "global", 3, "add");
request.parameters().put("num1", 40);
request.parameters().put("num2", 2);

bus.broadcast(request, null, asyncCallback);

Node.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var m = {
serviceType: 'avid.acs.calculator',
serviceRealm: 'global',
serviceVersion: 3,
op: 'add',
'paramSet' : {
'num1': 40,
'num2': 2
}
}

var callback = function(err) {
if (err) {
console.err("Received error: " + err.errorMessage);
console.err("Received error type: " + err.errorType);
} else {
console.log("Message is published");
}
};
access.broadcast(m, null, callback);

.NET

1
2
3
4
5
6
7
8
9
10
11
12
13
var msg = new Message("avid.acs.calculator", "global.test", 3, "add");
msg.Parameters.Add("num1", 40);
msg.Parameters.Add("num2", 2);

bus.Broadcast(msg, new AnonAsyncOpResult() {
onSuccess = (object o) => {
Console.WriteLine("Message is published");
}
onError = (OperationError error) => {
Console.WriteLine("Received error: " + error.Message);
Console.WriteLine("Received error type: " + error.Type);
}
});

C++

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class CalculatorPublished
: public acs::ReferenceCounter<CalculatorPublished>
, public acs::AsyncOpResult
{
public:
typedef boost::intrusive_ptr<CalculatorPublished> pointer;

virtual void ACSBUS_CALL onSuccess() {
std::cout << "Message is published" << std::endl;
}

virtual void ACSBUS_CALL onError(const acs::OperationError* error) {
std::cout << "Received error: " << error->message() << std::endl;
std::cout << "Received error type: " << error->type() << std::endl;
}

virtual void ACSBUS_CALL addRef() const { intrusive_ptr_add_ref(this); }

virtual void ACSBUS_CALL release() const { intrusive_ptr_release(this); }
};

// ...

acs::BusMessageSP msg = acs::CreateMessage("avid.acs.calculator", "global.test", 3, "add");
msg->getParameters()->addParameter("num1", acs::Variant(40));
msg->getParameters()->addParameter("num2", acs::Variant(2));

CalculatorPublished::pointer addPublished(new CalculatorPublished());
bus->broadcast(msg.get(), addPublished.get());

Publish | Subscribe

For this pattern, to identify destination, Avid Connector API has following identities:

  • channel name, name of the channel where message should be published, i.e. “platform.updates”;
  • subject, subject of the message, which can be used for filtering incoming messages, if wildcards are used. Possible wildcards:
    • * (star) - can substitute for exactly one word, i.e. with subject bus.*.updates you will receive everything like bus.platform.updates in the subject, but won’t receive subjects like: bus.platform.core.updates (two words);
    • # (hash) - can substitute for zero or more words, i.e. with subject bus.# you will receive messages with subject like bus.platform, bus.platform.updates, bus.platform.core.updates, bus.thirdparty.updates, etc;
  • shared name (optional), name of the shared channel, if shared channel is used;

Subscriber may subscribe to receive messages from given channel, by specific subject. Subject may have wildcards “*(star), #(hash)“. When message is published, all subscriber which are subscribed to matching channel and subject receive this message. To receive all messages from given channel “# (hash)“ must be used as a subject.

If multiple subscribers want to receive messages in round robin way (one at a time), then shared name must be provided. In this case when message is published, only one subscriber among subscriber with the same shared name will receive message in a time. Use case of this pattern, for example, when multiple instances of the same service are running, and application must perform database updates on receiving notifications, only one in a time should receive message and write to database.

Examples

Java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MyChannelMessageHandler implements ChannelMessageHandler {
@Override
public void onChannelMessage(ChannelContext context) {
System.out.println("Message received: " + context.getMessage().results().asJson().toString();
}
}
AsyncCallback asyncCallback = new AsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
System.out.println("Successfully subscribed");
}

@Override
public void onError(CallbackError error) {
System.out.println("Received error: " + error.getMessage().results().asJson().toString());
System.out.println("Received error type: " + error.getType());
}
};

ChannelMessageHandler myChannelMessageHandler = new MyChannelMessageHandler();

List<String> myBindings = new ArrayList<String>();
myBindings.add("log.CalculatorService.*");
myBindings.add("log.*.info");
myBindings.add("log.*.warning");
bus.subscribeToChannel("my.log.channel", myBindings, mySubscriber, null, asyncCallback);


// To subscribe for the shared channel
ChannelOptions options = new ChannelOptions("shared.name");
bus.subscribeToChannel(loggingChannelName, myBindings, mySubscriber, options, asyncCallback);

Node.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const channel = 'my.log.channel';
const bindings = ['log.CalculatorService.*', 'log.*.info', 'log.*.warning'];

const callback = function(err, subscriberId) {
if (err) {
console.err("Received error: " + err.errorMessage);
console.err("Received error type: " + err.errorType);
} else {
console.log("Successfully subscribed");
}
};

const subscriber = function (channelContext) {
console.log('Received channel message: ', JSON.stringify(channelContext.getChannelMessage()));
}

access.subscribeToChannel('my.log.channel', bindings, subscriber, callback);


// To subscribe for the shared channel
access.subscribeToSharedChannel('my.log.channel', '#', 'shared.name', onChannelMessage, callback);

.NET

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var bindings = new List<String>();
bindings.Add("log.CalculatorService.*");
bindings.Add("log.*.info");
bindings.Add("log.*.warning");


private class MySubscriber: IChannelMessageSubscriber
{
public void OnChannelMessage(IChannelContext channelContext)
{
Console.WriteLine("Received channel message: " + channelContext.ChannelMessage.Data);
}
}


bus.SubscribeToChannel("my.log.channel", bindings, new MySubscriber(), new AnonAsyncOpResult() {
onSuccess = (object o) => {
Console.WriteLine("Successfully subscribed");
}
onError = (OperationError error) => {
Console.WriteLine("Received error: " + error.Message);
Console.WriteLine("Received error type: " + error.Type);
}
});

C++

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class MySubscriber : public acs::ChannelMessageSubscriber
{
public:
virtual void ACSBUS_CALL onChannelMessage(const acs::ChannelMessageContext& channelContext) override
{
std::ostringstream oss;
oss << "Received channel message: " << channelContext.getChannelMessage() << std::endl;
}
};

class OperationResult
: public acs::ReferenceCounter<OperationResult>
, public acs::AsyncOpResult
{
public:
typedef boost::intrusive_ptr<OperationResult> pointer;

OperationResult(const std::string& op) : m_op(op) {}

virtual void ACSBUS_CALL onSuccess()
{
std::cout << "Successfully " << m_op << std::endl;
}

virtual void ACSBUS_CALL onError(const acs::OperationError* error)
{
std::cout << "Received error: " << error->message() << std::endl;
std::cout << "Received error type: " << error->type() << std::endl;
}

virtual void ACSBUS_CALL addRef() const { intrusive_ptr_add_ref(this); }

virtual void ACSBUS_CALL release() const { intrusive_ptr_release(this); }

private:
const std::string m_op;
};

// ...


acs::ChannelBindingsSP bindings = acs::CreateChannelBindings();
bindings->addBinding("log.CalculatorService.*");
bindings->addBinding("log.*.info");
bindings->addBinding("log.*.warning");

OperationResult::pointer opResult(new OperationResult("subscribed"));
MySubscriber subscriber;
bus->subscribeToChannel("my.log.channel", bindings.get(), &subscriber, opResult.get());