Written by Michał Amerek
Pragmatic Programmer
Published May 20, 2019

Messaging Patterns implemented with SQS and SNS

In the first two parts of the series about messaging (1 , 2) I made a certain inventory of concepts. Now, I am about to mix them up and show the usage with the help of SNS <-> SQS implementations.

I work on backend part of  Schibsted Publishing Platform – Creation Suite, which is content management system for a number of different newspapers. It contains tools and functions that help journalists to do their jobs. The system consists of several dozen microservices, communicating asynchronously with the use of AWS. 

I will focus here on role models, making only loose inclusions that refer to the system in which I work.

DATATYPE

SNS implements the Publish-Subscribe pattern, in which messages are published to the so-called “topics”. In the simplest form, all subscribers receive all and the same messages published for topics. It is, therefore, possible to imagine that messages are of a specific type per topic, and this one is a logical Datatype channel. In this solution, we create as many topics as the types of messages and, for example, subscribe to a different number of SQS queues to each of them.

FILTERING

At the same time, SNS allows you to fill in some additional roles – internally stores messages, routs them to subscribers using “fanout” but also allows you to filter messages. Thanks to this, messages can only be sent to selected subscribers if the message attributes meet certain criteria. Thus, a channel dedicated to a specific type of message can be conceptually expanded and differentiated by some metadata. Thanks to this, we can enter various events and commands, and take the logic of selection out of the real consumer of the message.

MESSAGE TYPE

As M. Fowler presented vividly in „The many Meanings of Event-Driven Architecture”, passive- aggressive commands are trap:

“A simple example of this trap is when an event is used as a passive-aggressive command. This happens when the source system expects the recipient to carry out an action, and ought to use a command message to show that intention, but styles the message as an event instead.”

Therefore, it is a good practice, apart from the proper naming, to separate the channels and dedicate them separately to events and commands, even for the reason that they are accompanied by other intentions, and thus a set of metadata. It is important to notice that you cannot subscribe to one and same SQS with different filtering policy to one and same SNS topic.

EVENT

That’s enough if we introduce a field eventType to metadata, that will tell us what has just happened in the system, and we’ll give a possibility for a number of different consumers (zero or more) to register their interest in a certain type of changes. Message producer is not aware of them; it just announces the occurrence of the event. In the case of sample publishing platform, we could talk about some events like articleDraft, articleScheduled, articlePublished, articleUpdated or  articleRemoved and introduce SNS filtering policy for some SQS subscriber as simple as:

{
  "eventType": [
    "articlePublished",
    "articleUpdated",
    "articleRemoved"
  ]
}

or for some other SQS subscriber:

{
  "eventType": [
    "articleDraft",
    "articleScheduled"
  ]
}

(one event sent on each change; number of subscribers interested in number of events; SNS is doing rest)

COMMAND

Commands are always dedicated to at least ONE receiver. It brings to mind Point-To-Point Channel. However we could also use the advantages of Publish-Subscribe Channel, SNS filtering and:

  • first of all be able to subscribe specialised consumers by commandName using one and the same command channel
  • second, be able to register interest in one and the same command by a different number of consumers taking care of different interesting concerns around the same message
  • third, be able to define the target list of parallel receivers (Recipient List) for one and the same command, sending it with a message and subscribe receivers to the channel by their names

That’s how some of the components, taking care of content enriching (for example AdEnricher), could subscribe to the enrich command:

{
  "recipientList": [
    "AdEnricher"
  ]
  "commandName": [
    "enrich"
  ]
}

and the other one (for example TagEnricher):

{
  "recipientList": [
    "TagEnricher"
  ]
  "commandName": [
    "enrich"
  ]
}

(one command enrich can be sent to parallel or dedicated receivers [notice]; each one subscribes with its own name on recipientList; SNS is doing rest)

Here you can read more about SNS filtering policy.

CONSUMER

One way to receive messages sent to SNS is to subscribe to the SQS (here you can find more about different SNS scenarios). For this we need to implement Polling Consumer running in an endless loop.

public class PollingConsumer implements Runnable {
    private final AmazonSQS sqs;
    private final String queueUrl;
    public MessageConsumer(final AmazonSQS sqs, final String queueUrl) {
        this.sqs = sqs;
        this.queueUrl = queueUrl;
    }

    public void run() {

        while (true) {
            ReceiveMessageResult receiveMessageResult = this.sqs.receiveMessage(
                new ReceiveMessageRequest()
                        .withQueueUrl(this.queueUrl)
                        .withAttributeNames("all")
                        .withMessageAttributeNames("all")
                        .withMaxNumberOfMessages(5)
                        .withWaitTimeSeconds(2));
            for (Message receivedMessage : receiveMessageResult.getMessages()) { 
                // handle message 
            }
        }
    }
}

DISPATCHER

Now, our application may be interested in different type and number of events or commands. SNS filtering policy ensures that it will receive only such messages. But still it can be one and the same application that is to perform differently based on the message. Here comes the Message Dispatcher into play that based on the message type can delegate it to the proper handler.

This is how we could get the eventType value:

message.getMessageAttributes().get("eventType").getStringValue()

or the commandName:

message.getMessageAttributes().get("commandName").getStringValue()

SENDER

Even if we succeed in moving out the filtering logic from the consumer (by SNS filtering policy), it can still act as a publisher. If routing is involved, the question is asked who has knowledge about the forward receiver – sender of a message or the consumer itself. If the sender and we think about a pipeline (sequential) or command to be executed, it may be enough that while sending a message we will use metadata, which will be the endpoint address where the message should be sent, the success of its processing or its failure (Return Address). That’s a request-reply style that we sometimes use in our publishing platform as a feedback channel.

message.getMessageAttributes().get("returnAddress").getStringValue()

Sending is simple and easy to introduce a certain abstraction that will allow us to send messages to any output channel without taking care of whether it is SQS or SNS. All you have to have is just the endpoint address, and this is the queueUrl (https://sqs …) and topicArn (arn:aws:sns …), respectively. So, you if it would be about feedback, you could get the return address and use it as the endpoint to send the message to.

For SQS we will use:

final SendMessageRequest message = new SendMessageRequest(queueUrl, payload);
message.addMessageAttributesEntry(key, new MessageAttributeValue().withStringValue(value).withDataType("String"));
// more metadata if needed
sqs.sendMessage(message);

For SNS:

final PublishRequest message = new PublishRequest(topicArn, payload);
message.addMessageAttributesEntry(key, new MessageAttributeValue().withStringValue(value).withDataType("String"));
// more metadata if needed
sns.sendMessage(message);

A key in the above would be any metadata key (eventType, commandName, recipientList or returnAddress from given examples).

Notice:

For previously mentioned case of recipient list or any other metadata being a list you should use: .withDataType("String.Array") and send as a value JSON array like for example ["AdEnricher","TagEnricher"] as a parallel receivers of sample  enrich command.

SUMMARY

Following previously described messaging concepts, and on the basis of our publishing platform experience, I would say in summary that:

  • message is something you communicate with the other services; try to think and identify the message type you signal (for example by metadata)
  • use the metadata for good reasons (some of them I have mentioned in that article); there is a limit for the number of them out there in AWS
  • if possible, use dedicated channels, at least different for commands and events
  • consider using SNS Topic as a publisher (instead of Point-To-Point SQS) and enjoy all the benefits of Publish-Subscribe channel
  • make use of filtering policies and other SNS – SQS features

REFERENCE

Written by Michał Amerek
Pragmatic Programmer
Published May 20, 2019