Written by Jan Gurda
EM Payments Gateway Kraków
Published September 10, 2015

Mocking Amazon SQS with ElasticMQ

This article presents our experience with using ElasticMq as a substitution for Amazon Web Service’s Simple Queue Service (SQS) for black box testing of a single microservice application.

These kinds of tests are usually called service tests or module tests and are an extremely important part of application development. Some people even drop the concept of test pyramid and tend to have a comparable number of service tests as unit tests (read more here ). I like this idea.

Higher level tests give us more confidence that a particular component works correctly. A microservice architecture requires special focus on writing good and reliable module tests. What’s more these tests should be relatively fast, executed with every CI build, and their role is to make sure that the service responds appropriately to a given input and adheres to the contract with downstream services. Because this kind of test focuses on a single service, (which is actually an independent application in the world of microservices) the developer has to simulate surrounding downstream applications.

The pictures below present a schematic view of real services vs. single service test. This kind of component aggregation is a very common pattern in our system. The table presents the relation between real and mock components used in both scenarios.

 

Figure 1: Real services connectivity

Figure 1: Real services connectivity

Figure 2: Service test component diagram

Figure 2: Service test component diagram

 

table

What’s worth mentioning is that the whole service test, together with all simulated downstream systems should be executed inside a single JVM process. It speeds up the execution and reduces the possibility of a resource conflict on Continuous Integration server.

Simple Queue Service in use

In our project we use REST for synchronous communication between services.

There is a bunch of very good tools which can simulate HTTP endpoint. Two products we evaluated are Mock Server (http://www.mock-server.com/) and Wire Mock (http://wiremock.org/). There’s also a very useful SaaS solution – Mocky (http://www.mocky.io/). I prefer Wire Mock and I believe that currently it’s the most flexible library, but Mock Server developers also keep providing new features.

Synchronous HTTP communication however cannot be applied everywhere. For some use cases we realized that asynchronous queue communication fits better. Because we heavily utilize the goodies of Amazon Web Services, we decided to go with Amazon Simple Queue Service (SQS) as our queue solution. Amazon’s marketing says SQS is:

“… fast, reliable, scalable, fully managed message queuing service. SQS makes it simple and cost-effective to decouple the components of a cloud application. You can use SQS to transmit any volume of data, at any level of throughput, without losing messages or requiring other services to be always available.”

After some time spent on using SQS we came to the conclusion that it’s a really reliable, easy to use, and easy to maintain solution at a reasonable price.

With the very first service utilizing SQS we faced a tough question: How to construct and run service tests? Using real SQS queues for this purpose has some significant drawbacks. First of all – cost. Our applications’ builds run a bunch of service tests and Amazon charges us not only for messages sent/received but also for data transfer (see the pricing here ). Usually the SQS cost is a very small part of the AWS total bill, but for startups or private initiatives every cent counts.

Another problem we experienced was with interference between our tests. One test published a message and the other one could delete it because the Continuous Integration server may run few builds simultaneously. That was not acceptable since our test had become non-deterministic. A potential solution to this problem was to create a separate queue for each test, but this just sounds ridiculous.

Quite fast we found ElasticMQ (https://github.com/adamw/elasticmq) and realized that it is compatible with the SQS protocol and works well with the official AWS Java API, which made it suitable to simulate Amazon SQS for our service tests.

The first version of tests using ElasticMQ were quite ugly. In every test class we had a code which starts ElasticMQ and creates necessary queues. There was a lot of code duplication. We wanted to extract some setup code and make ElasticMQ start and setup in more declarative way. Junit Rules (https://github.com/junit-team/junit/wiki/Rules) helped us achieve our target of simple tests code.

In order to present our way of running service tests using ElasticMQ I will go through a sample application created especially for this article. Whole source code listed in this article is available here.

For simplicity in some places I use Lombok annotations (https://projectlombok.org/) to avoid boilerplate code in listings.

SQS queue abstraction

We started our refactoring from covering AmazonSQSClient class with our own abstraction:

 

package pl.schibsted.spid.elasticmq.util;

import java.util.List;

import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import lombok.AllArgsConstructor;

@AllArgsConstructor
public class SqsQueue {

    private final AmazonSQSClient client;

    private final String queueUrl;

    public void send(Message toSend) {
        SendMessageRequest sendMessageRequest = new SendMessageRequest(queueUrl, toSend.getBody());
        sendMessageRequest.setMessageAttributes(toSend.getMessageAttributes());
        client.sendMessage(sendMessageRequest);
    }

    public List read(int maxMessages) {
        ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl);
        request.setMaxNumberOfMessages(maxMessages);
        ReceiveMessageResult receiveMessage = client.receiveMessage(request);
        return receiveMessage.getMessages();
    }

    public void purge() {
        client.purgeQueue(new PurgeQueueRequest(queueUrl));
    }

}

SqsQueue class uses Amazon AWS Java API and its responsibility is to wrap send/receive/purge requests construction and invocation of AmazonSQSClient. It exposes following operations:

  • send – puts given message to queue identified by queueUrl
  • read – tries to retrieve up to “maxMessages” from queue identified by queueUrl
  • purge – removes all messages from queue identified by queueUrl

Rule configuration

To set up ElasticMQ we need a list of queues that must be created on startup and TCP port which will be occupied by ElasticMQ. Because of that, we created configuration class. It’s just simple POJO:

package pl.schibsted.spid.elasticmq.util;

import java.util.Set;

import lombok.Builder;
import lombok.Getter;
import lombok.Singular;

@Builder
@Getter
public class SqsRuleConfiguration {

    private int port;

    @Singular
    private Set queues;
}

Test rule

The rule itself is also not very complicated, it simply starts ElasticMQ server on given port, creates queues specified in configuration and creates set of SqsQueues:

package pl.schibsted.spid.elasticmq.util;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import org.elasticmq.NodeAddress;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqsRule implements TestRule {

    private static final Logger LOGGER = LoggerFactory.getLogger(SqsRule.class);

    // Reference to ElasticMQ server
    private static SQSRestServer server;
    private Map<String, SqsQueue> queues = new ConcurrentHashMap<>();
    private SqsRuleConfiguration configuration;

    public SqsRule(SqsRuleConfiguration configuration) {
        this.configuration = configuration;
    }

    @Override
    public Statement apply(Statement childStatement, Description description) {
        return new Statement() {
            @Override
            public void evaluate() throws Throwable {
                try {
                    setup();
                    childStatement.evaluate();
                } finally {
                    shutdown();
                }
            }
        };
    }

    private synchronized void setup() {
        // Start ElasticMQ in embedded mode.
        server = SQSRestServerBuilder.withPort(configuration.getPort())
                .withServerAddress(new NodeAddress("http", "localhost", configuration.getPort(), "")).start();
        LOGGER.info("SQS server started on port " + configuration.getPort());
        for (String queueName : configuration.getQueues()) {
            // Use standard ElasticMQ credentials ("x", "x")
            AmazonSQSClient amazonSQSClient = new AmazonSQSClient(new BasicAWSCredentials("x", "x"));
            // ElasticMQ is running on the same machine as integration test
            String endpoint = "http://localhost:" + configuration.getPort();
            amazonSQSClient.setEndpoint(endpoint);
            // Create queue with given name
            amazonSQSClient.createQueue(queueName);
            // Queue URL in ElasticMQ is http://host:port/queue/{queue_name}
            queues.put(queueName, new SqsQueue(amazonSQSClient, endpoint + "/queue/" + queueName));
        }
    }

    private synchronized void shutdown() {
        server.stopAndWait();
    }

    public SqsQueue getQueue(String queueName) {
        return queues.get(queueName);
    }

    public void purgeAllQueues() {
        // Cleans
        for (String queueName : queues.keySet()) {
            queues.get(queueName).purge();
        }
    }
}

Rule in action

Now I’d like to show how the presented rule could be used to verify the behavior of a very simple Dropwizard application (for tutorial see this link). The application exposes HTTP endpoint called “ping”. When the request is received it transforms given argument to uppercase and puts the message in a queue. The application behaves correctly when after invocation of “ping” endpoint:

  • HTTP result status is 204 (No content)
  • There is only one message available in queue
  • Body of that message is a request string transformed to uppercase.

Let’s start from application configuration. It simply contains URL of a queue where message will be sent to and Amazon credentials.

package pl.schibsted.spid.elasticmq.server;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.dropwizard.Configuration;
import org.hibernate.validator.constraints.NotEmpty;

public class ElasticMqRuleSampleApplicationConfiguration extends Configuration {

    @NotEmpty
    private String queueUrl;
    @NotEmpty
    private String awsAccessKey;
    @NotEmpty
    private String awsSecretKey;

    @JsonProperty
    public String getQueueUrl() {
        return queueUrl;
    }

    @JsonProperty
    public String getAwsAccessKey() {
        return awsAccessKey;
    }

    @JsonProperty
    public String getAwsSecretKey() {
        return awsSecretKey;
    }
}

Test configuration (test.yml file) points to “sample-queue” and provides AWS credentials (“x”, “x” for ElasticMQ):

queueUrl: http://localhost:8888/queue/sample-queue
awsAccessKey: x
awsSecretKey: x

REST resource itself exposes only one method: “ping”. It consumes JSON and does not return anything (what translates to HTTP 204 status). In endpoint constructor we instantiate new AmazonSQSClient with given credentials. Please remember that resource classes are used by multiple threads concurrently and should be thread safe.

package pl.schibsted.spid.elasticmq.resources;

import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MediaType;

import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQSClient;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplicationConfiguration;

@Path("/ping")
@Consumes(MediaType.APPLICATION_JSON)
public class PingResource {

    private final AmazonSQSClient client;
    private final String queueUrl;

    public PingResource(ElasticMqRuleSampleApplicationConfiguration configuration) {
        // Instantiate AmazonSQSClient
        this.client = new AmazonSQSClient(new BasicAWSCredentials(configuration.getAwsAccessKey(), configuration.getAwsSecretKey()));
        this.queueUrl = configuration.getQueueUrl();
    }

    @POST
    public void ping(@NotNull String pingBody) {
        String toSend = processPingBody(pingBody);
        // sendMessage method is thread safe.
        // Send message with given body to queue.
        client.sendMessage(queueUrl, toSend);
    }

    private String processPingBody(String pingBody) {
        // Simulate very complicated message processing.
        return pingBody.toUpperCase();
    }
}

Main application class simply glues it all together:

package pl.schibsted.spid.elasticmq.server;

import io.dropwizard.Application;
import io.dropwizard.setup.Environment;
import pl.schibsted.spid.elasticmq.resources.PingResource;

public class ElasticMqRuleSampleApplication extends Application {

    public static void main(String[] args) throws Exception {
        new ElasticMqRuleSampleApplication().run(args);
    }

    @Override
    public void run(ElasticMqRuleSampleApplicationConfiguration configuration, Environment environment) throws Exception {
        PingResource resource = new PingResource(configuration);
        environment.jersey().register(resource);
    }

}

Service test verifying “ping” resource behavior looks as follows:

package pl.schibsted.spid.elasticmq.resources;

import java.util.List;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import com.amazonaws.services.sqs.model.Message;
import io.dropwizard.testing.junit.DropwizardAppRule;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplication;
import pl.schibsted.spid.elasticmq.server.ElasticMqRuleSampleApplicationConfiguration;
import pl.schibsted.spid.elasticmq.util.SqsRule;
import pl.schibsted.spid.elasticmq.util.SqsRuleConfiguration;

import static org.junit.Assert.assertEquals;

public class ITestPingResource {

    @ClassRule
    public static DropwizardAppRule app =
            new DropwizardAppRule<>(ElasticMqRuleSampleApplication.class,
                    ITestPingResource.class.getClassLoader().getResource("test.yml").getPath());

    @ClassRule
    public static SqsRule sqs = new SqsRule(SqsRuleConfiguration.builder()
            .queue("sample-queue").port(8888).build());

    private Client client = ClientBuilder.newClient();

    @After
    public void tearDown() {
        sqs.purgeAllQueues();
    }

    @Test
    public void shouldPublishProcessedRequestPayload() throws Exception {
        // given
        String toSend = "abcdefgh";
        // when
        Response response = client
                .target("http://127.0.0.1:" + app.getLocalPort() + "/ping")
                .request().post(Entity.json(toSend));
        // then
        assertEquals(Status.NO_CONTENT.getStatusCode(), response.getStatus());
        List messagesFromQueue = sqs.getQueue("sample-queue").read(10);
        assertEquals(1, messagesFromQueue.size());
        assertEquals("ABCDEFGH", messagesFromQueue.get(0).getBody());
    }
}

Annotations ClassRule setup Dropwizard (using test.yml as configuration file) and ElasticMQ. SqsQueuesRule constructor accepts configuration: “sample-queue” to create and port 8888 – compare with test.yml file. I use basic Jersey client to reach “ping” endpoint and parse HTTP response. Three assertions in “then” block verify our assumptions we defined in previous sections.

I hope this article gave you at least general overview of how Amazon Simple Queue Service can be simulated in service tests. The advantages of presented approach are: simplicity, decoupling from third party service (needed especially when CI server does not allow Internet access) and cost control. Please feel free to use it in your projects.

Written by Jan Gurda
EM Payments Gateway Kraków
Published September 10, 2015