Written by Patryk Kurczyna
Kotlin/Java Developer
Published December 12, 2023

Integration Testing Deep Dive Part II

 

1. Introduction

In the first part of this article we implemented many integration tests covering the most common external services that most applications use, including databases and REST APIs.

We also looked at the exact setup that is needed to create an efficient integration tests suite for your application, using Gradle, Spock and Spring Boot.

This part will continue the exploration of various external services that are often integrated in common backend microservices, such as: Kafka (consumers and producers), AWS SQS, Storage (Google Cloud) and SMTP protocol for sending emails.

Finally, we will make an attempt of organising the entire configuration into what can be defined as a comprehensive integration testing framework, leveraging the capabilities offered by Spock Extensions.

2. Kafka Consumer testing

Nowadays, many applications consume messages from Kafka or a different message bus. Setting up and maintaining your own Kafka cluster for integration test purposes is a bit cumbersome. Hence, assistance can be obtained once again from the Testcontainers, by using its Kafka module.

Let’s implement a Kafka consumer in the application that will read messages from the given topic and handle them. We can later create test scenarios to ensure the handling logic is correct. 

Kafka consumer

To write Kafka consumer, an additional Spring dependency is required. Let’s add the Testcontainers Kafka module as well.

implementation("org.springframework.kafka:spring-kafka:$springKafkaVersion")

itestImplementation("org.testcontainers:kafka:$testcontainersVersion")

Assuming the topic contains user events for registering or deleting user entries, the UserRepository from the previous article needs enhancement by adding a delete method.

private companion object {
    ...
         const val DELETE_USER_QUERY = """DELETE FROM users where id = :id"""
}

override fun delete(id: Long) {
    jdbcTemplate.update(DELETE_USER_QUERY, mapOf("id" to id))
}

Now, a model class is required to represent the events.

data class UserEvent(
    val id: Long,
    val operation: Operation,
    val name: String
) {
    fun toUser() = User(id, name)
}

enum class Operation {
    REGISTER, DELETE
}

Finally, thanks to spring-kafka library we have a very simple yet powerful way of creating the Kafka consumer.

@Component
class UserListener(private val repository: UserRepository) {

    @KafkaListener(topics = ["\${topics.users}"])
    fun handleUserEvent(event: UserEvent) {
        when(event.operation) {
            Operation.REGISTER -> repository.addOrUpdate(event.toUser())
            Operation.DELETE -> repository.delete(event.id)
        }
    }
}

KafkaListener annotation guarantees that the Kafka consumer will be started in a separate thread, and it will handle UserEvent events from the topics.users kafka topic. Ensure the topic placeholder gets the correct value in the application.yml config file:

topics:
  users: public.user.events

Last, but not least, is Kafka configuration. Spring provides a bunch of different properties for Kafka, but only a few that are most needed will be used.

spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAPSERVERS}
    consumer:
      group-id: spring-it
      auto-offset-reset: earliest
      enable-auto-commit: false
      key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        spring.json.trusted.packages: "pl.kurczyna.springit"

The following are the most important:

  • kafka.bootstrap-servers -> shortly, it serves as the Kafka cluster URL; the application can take it, for instance, from the environment variable KAFKA_BOOTSTRAPSERVERS
  • kafka.consumer.group-id -> Group Id of the consumer, it’s usually the application name
  • kafka.consumer.key-deserializer and kafka.consumer.value-deserializer -> Key and value deserializers for our events. In this case, keys are simple Strings and values are in JSON format

Adding this configuration will help Spring autoconfigure Kafka, so now the consumer (@KafkaListener) can work properly.

Integration Test

Testing scenarios

Two testing scenarios can be outlined for the Kafka user consumer:

  1. User registration
    • Prepare registration event
    • Send the event to Kafka (using utility Kafka producer)
    • Verify that the user has eventually been stored in the DB

Eventually word is the key here. The Kafka consumer operates asynchronously, without a precise way to determine the exact timing of its actions.

  1. User deletion
    • Insert the user to the db
    • Prepare deletion event
    • Send the event to Kafka (using utility Kafka producer)
    • Verify that the user has eventually been deleted from the DB

Test setup

Similar to WireMock, the aim is to decouple specific Testcontainers Kafka logic from the tests. Hence, a KafkaMock utility class will be created to manage everything related to Kafka.

  • Defining the docker container to be used
  • Starting and stopping docker container
  • Exposing the bootstrapServers url that Kafka is running on
class KafkaMock {

    private static final Logger log = LoggerFactory.getLogger(KafkaMock.class)
    private static final String CONFLUENT_PLATFORM_VERSION = '7.4.0'
    private static final DockerImageName KAFKA_IMAGE =
            DockerImageName
                    .parse('confluentinc/cp-kafka')
                    .withTag(CONFLUENT_PLATFORM_VERSION)
    private static KafkaContainer container

    static String bootstrapServers

    static void start() {
        log.info("Starting Kafka Container Mock")
        container = new KafkaContainer(KAFKA_IMAGE)
        container.start()
        bootstrapServers = container.getBootstrapServers()
    }

    static void stop() {
        log.info("Stopping Kafka Mock")
        container.stop()
    }
}

Now, we have to manage our mock in the IntegrationTestBase:

def setupSpec() {
    ...
    KafkaMock.start() // Start Kafka container
}

def cleanupSpec() {
    ...
    KafkaMock.stop() // Stop Kafka container
}

TestPropertySourceUtils has also got to be updated to expose kafka.bootstrapServers property that can be used in the application-itest.yml:

...
String[] properties = ["wiremock.stripePort=$stripePort", "kafka.bootstrapServers=${KafkaMock.bootstrapServers}"]
...

 kafka:
    bootstrap-servers: ${kafka.bootstrapServers}
    consumer:
      group-id: spring-it-itest
    producer:
      retries: 1
      acks: all
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      key-serializer: "org.apache.kafka.common.serialization.StringSerializer"

As you can see, our application now uses an embedded Kafka container (in the integration test scope) that we spin up using Testcontainers.

Two other interesting properties in the config above are:

  • consumer.group-id -> it’s a good practice to override this value for integration tests
  • producer section -> To send test Kafka events, creating a simple Kafka producer is essential. You can use the KafkaRestTemplate Spring utility for it, which can be auto-configured by Spring when the necessary properties are provided.

Then, it can be autowired in the IntegrationTestBase class so you might use it in the tests.

@Autowired
KafkaTemplate<String, UserEvent> kafkaUsersProducer

Test implementation

class ITestUserEventsKafka extends IntegrationTestBase {

    @Value("\${topics.users}")
    String usersTopic

    def "should store user in db when registration event comes"() {
        given: 'There is a user event'
        Long id = nextLong()
        String name = 'John Lundstram'
        UserEvent event = new UserEvent(id, Operation.REGISTER, name)

        when: 'The event is sent to Kafka'
        kafkaUsersProducer.send(usersTopic, id.toString(), event)

        then: 'Within 5 seconds the user is register in the db. Note that we test asynchronous code'
        new PollingConditions(timeout: 5).eventually {
            User inDb = dbTestClient.getUserById(id)
            inDb.id == id
            inDb.name == name
        }
    }

    def "should delete user from db when delete event comes"() {
        given: 'There is already one user in the DB'
        Long id = nextLong()
        String name = 'John Lundstram'
        dbTestClient.insertUser(id: id, name: name)

        and: 'There is a delete user event'
        UserEvent event = new UserEvent(id, Operation.DELETE, name)

        when: 'The event is sent to Kafka'
        kafkaUsersProducer.send(usersTopic, id.toString(), event)

        then: 'Within 5 seconds the user is deleted from the db. Note that we test asynchronous code'
        new PollingConditions(timeout: 5).eventually {
            User inDb = dbTestClient.getUserById(id)
            inDb == null
        }
    }
}

Let’s examine the specifics outlined in this context.

Initially, the value of the property topics.users is injected into the class property usersTopic. This is crucial for determining the designated topic when sending the event.

Secondly, KafkaTemplate is used to send the event:

kafkaUsersProducer.send(usersTopic, id.toString(), event)

by providing the topic name, event key and event value.

Lastly, we verify that the user is inserted (or deleted) using the DbTestClient you already know.

However, the most crucial aspect is performing this verification within a PollingConditions.eventually() block. This is the mechanism that allows testing asynchronous behaviour. Spock will run the verification block multiple times before it succeeds or the timeout elapses. In this case scenario, 5 seconds is a pretty safe bet – the logic is relatively simple, making it reasonable to expect that event handling will finish within 5 seconds.

3. Kafka Producer testing

Similarly to testing Kafka consumers, the application may produce some events, so we might want to test if those events are sent properly. This can be done quite easily by enhancing our setup a bit.

Kafka producer

Let’s imagine a broadcast event needs to be published every time the user is registered. Implementing a UserBroadcast service that will publish relevant events to Kafka is feasible.

@Component
class UserBroadcast(
    @Value("\${topics.broadcast}") private val broadcastTopic: String,
    private val kafkaUsersProducer: KafkaTemplate<String, BroadcastEvent>
) {

    fun broadcastUserRegistration(user: User) {
        kafkaUsersProducer.send(broadcastTopic, user.id.toString(), BroadcastEvent.fromUserEvent(user))
    }
}

As you remember, KafkaTemplate needs additional producer configuration, so it is advisable to add it to the application.yml file:

kafka:
  producer:
    retries: 1
    acks: all
    value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
    key-serializer: "org.apache.kafka.common.serialization.StringSerializer"

topics:
    broadcast: public.user.broadcast

The config is exactly the same as defined in the itest profile, so there is no need to override it anymore. Therefore, it is suggested to remove the producer section from the application-itest.yml.

Let’s also add BroadcastEvent to our model:

data class BroadcastEvent(
    val userId: Long,
    val timestamp: Instant
) {
    companion object {
        fun fromUserEvent(user: User) = BroadcastEvent(user.id, Instant.now())
    }
}

Now, you can inject a UserBroadcast component to the UserController and publish the broadcast event every time the user is registered.

class UserController(private val repository: UserRepository, private val userBroadcast: UserBroadcast) {
...

    @PostMapping
    fun addOrUpdate(@RequestBody user: User): ResponseEntity<Unit> {
        repository.addOrUpdate(user)
        userBroadcast.broadcastUserRegistration(user)
        return ResponseEntity.status(HttpStatus.CREATED).build()
    }
...
}

Integration Test

Testing scenario

Our testing scenario is relatively simple. However, a clever workaround will be necessary to ensure its functionality. Let’s enhance the testing scenario from the controller test (ITestUsers) we defined before:

Add new user:

  • Call the POST /api/users endpoint
  • Verify the response status
  • Verify that the user is added to the DB
  • Verify that the broadcast event has been published

Test setup

But how to verify Kafka events being published? Numerous methods are available, but my approach involves intercepting the events produced by our application, storing them in memory, and subsequently verifying if they were properly sent. To do that, you may create another Kafka consumer, in the itest scope, that will imitate the consumer of our UserBroadcast events.

@Component
@Profile('itest')
class KafkaTestConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTestConsumer.class)

    @KafkaListener(topics = ['\${topics.broadcast}'])
    static void receiveBroadcastMessage(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info('Received broadcast kafka message: {}', consumerRecord.value())
        KafkaMock.recordMessage(consumerRecord.topic(), consumerRecord.value())
    }

    static <T>List<T> getTopicMessages(String topic) {
        KafkaMock.messages[topic].collect { it as T }
    }
}

@Profile('itest') means that the component will only be created in the itest scope and will not affect our application in production. As you know, @KafkaListener will run the consumer in a separate thread and every time we get the event, we will record it in our KafkaMock: KafkaMock.recordMessage(consumerRecord.topic(), consumerRecord.value())

The getTopicMessages method is exposed for test purposes, in order to get all messages produced to the topic in question

KafkaMock will store those messages in a ConcurrentHashMap in memory.

class KafkaMock {
...
   static ConcurrentHashMap<String, List<Object>> messages = new ConcurrentHashMap<>()
...

    static void recordMessage(String topic, Object message) {
        def topicMessages = messages.getOrDefault(topic, [])
        topicMessages.add(message)
        messages[topic] = topicMessages
    }
}

Now, let’s adjust the user registration test case.

Test implementation

def "should add new user"() {
    ...
    and: 'There is a new user in the DB'
    User inDb = dbTestClient.getUserById(id)
    inDb == user

    and: "Broadcast event for user with id $id has been sent"
    new PollingConditions(timeout: 5).eventually {
        def messages = KafkaTestConsumer.<BroadcastEvent>getTopicMessages(broadcastTopic).findAll()     {
                it.userId == id
            }
       assert messages.size() == 1
   }
}

As a result, another and section was added, in which a PollingConditions.eventually block is defined. In this verification step, the test consumer is checked to receive precisely one broadcast message, sent to the broadcastTopic, for the user (identified by its id) in question.

4. Storage (Google Cloud)

Another common pattern in the contemporary microservices is the integration with various cloud services such as Amazon Web Services (AWS) or Google Cloud.

One of the services that is widely adopted is cloud storage. In this particular chapter the emphasis will be put on the Google Cloud Storage, however AWS S3 testing is very similar.

Pictures Controller and Service

Let’s imagine there is a need to provide a functionality of uploading pictures in the system that you build. In an effort to accomplish this, you may think of implementing the following controller and service for uploading pictures using Google Cloud Storage.

@RestController
@RequestMapping("/api/pictures")
class PicturesController(private val picturesService: PicturesService) {

    @PostMapping
    fun uploadPicture(
        @RequestPart("picture", required = true)
        picture: MultipartFile
    ): ResponseEntity<Unit> {
        picturesService.uploadPicture(picture)
        return ResponseEntity.ok().build()
    }
}
interface PicturesService {
    fun uploadPicture(picture: MultipartFile)
}

@Service
class DefaultPicturesService(
    @Value("\${gcs.bucket}") private val bucketName: String,
    val storage: Storage
) : PicturesService {
    override fun uploadPicture(picture: MultipartFile) {
        val blobId = BlobId.of(bucketName, picture.originalFilename)
        val blobInfo = BlobInfo.newBuilder(blobId).apply {
            setContentType(picture.contentType)
        }.build()
        val content = picture.bytes

        storage.writer(blobInfo).use { writer ->
            writer.write(ByteBuffer.wrap(content, 0, content.size))
        }

        blobInfo.blobId.name
    }
}

The setup requires additional dependencies to declare:

dependencyManagement { // Google Cloud dependency management for Spring
    imports {
        mavenBom("com.google.cloud:spring-cloud-gcp-dependencies:$springCloudGcpVersion")
    }
}

...

implementation("com.google.cloud:spring-cloud-gcp-starter-storage") // GCS dependency

And a GCS bucket configuration in the application.yml file:

gcs:
  bucket: gcs://pictures

Integration Test

Testing scenario

Assume following testing scenario:

  • There is a picture to be uploaded to GCS
  • The upload endpoint is called, with the file to upload as a multipart body
  • Response is successful
  • The file is uploaded to GCS bucket

Test setup

The latter step is the most essential one, therefore you need to find a way of setting up a mocked Google Cloud Storage bucket for the test and be able to verify its content afterwards.

Unfortunately, Testcontainers does not provide a dedicated module for Google Cloud Storage, thus you might opt for using the GenericContainer and set up a mocked GCS yourself. Favourably, there are numerous open-source libraries available for the task, one of which is Fake GCS Server. It provides an emulator for Google Cloud Storage API, alongside with the Java SDK.

Let’s use it in practice.

Firstly, create a GcsMock with the use of Fake Gcs Java SDK:

class GcsMock {

    private static def container
    static def gcsPort

    static void start() {
        container = new GenericContainer<>("fsouza/fake-gcs-server:latest")
                .withExposedPorts(4443)
                .withCreateContainerCmdModifier {
                    it.withEntrypoint(
                            "/bin/fake-gcs-server",
                            "-scheme", "http")
                }
        container.start()
        gcsPort = container.getFirstMappedPort()
        updateExternalUrlWithContainerUrl(container.getHost(), gcsPort)
    }

    static void stop() {
        container.stop()
    }

    static String[] propertiesToRegister() {
        return ["gcs.port=$gcsPort"]
    }

    private static def updateExternalUrlWithContainerUrl(String host, Integer port) throws Exception {
        def fakeGcsExternalUrl = "http://" + host + ":" + port
        def modifyExternalUrlRequestUri = fakeGcsExternalUrl + "/_internal/config"
        def updateExternalUrlJson = new JSONObject()
        updateExternalUrlJson.put("externalUrl", fakeGcsExternalUrl)

        def request = HttpRequest.newBuilder()
                .uri(URI.create(modifyExternalUrlRequestUri))
                .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .PUT(HttpRequest.BodyPublishers.ofString(updateExternalUrlJson.toString()))
                .build()
        def response = HttpClient.newBuilder().build()
                .send(request, HttpResponse.BodyHandlers.discarding())

        if (response.statusCode() != HttpStatus.OK.value()) {
            throw new RuntimeException("Error updating fake-gcs-server with external URL, response status code ${response.statusCode()} != 200")
        }
    }
}

start and stop methods are already familiar, they are going to be used to manage the testcontainer lifecycle. You should also expose the port the GcsMock is running on as an application property. All this should be done in the setupSpec and cleanupSpec methods of the IntegrationTestBase:

def setupSpec() {
    stripeServer = new WireMockServer(stripePort)
    stripeServer.start()
    KafkaMock.start()
    GcsMock.start()
}

def cleanupSpec() {
   stripeServer.stop()
   KafkaMock.stop()
   GcsMock.stop()
}

...

String[] properties = [
    "wiremock.stripePort=$stripePort",
    "kafka.bootstrapServers=${KafkaMock.bootstrapServers}",
    "gcs.port=${GcsMock.gcsPort}"
]

Having GCS container in place, the application needs to “know” where to find it, thus it’s necessary to configure Storage Spring Bean accordingly:

@TestConfiguration
class GcsITestConfiguration {

    @Value("\${gcs.port}")
    private String gcsContainerPort

    @Bean
    @Primary
    Storage gcsStorage() {
        return StorageOptions.newBuilder()
                .setHost("http://localhost:$gcsContainerPort")
                .setProjectId("itest-project")
                .setCredentials(NoCredentials.getInstance())
                .build()
                .getService()
    }
}

The gcsContainerPort is injected from the application property gcs.port which points to the GCS testcontainer, you can see that the host and credentials for the Storage service is overridden accordingly. The @Primary annotation instructs Spring Boot to assign this @Bean definition a highest priority. Moreover, @TestConfiguration annotation means that this configuration class will only be applied in the test scope.

There is one more important step to make in order to complete the setup, which is importing the test configuration class for all the tests, by adding relevant @Import annotation in the IntegrationTestBase:

@Import(GcsITestConfiguration)
@ContextConfiguration(initializers = PropertyInitializer)
abstract class IntegrationTestBase extends Specification {

Now, you’re almost ready to implement the integration test scenario defined before, but let’s add another utility component, that will allow interaction with the Google Cloud Storage inside the tests.

gcs: 
  bucket: itest-pictures
@Component
class StorageTestClient {
    @Autowired
    Storage storageClient

    @Value("\${gcs.bucket}")
    private String bucketName

    def createBucket() {
        def bucketInfo = BucketInfo.newBuilder(bucketName).build()
        storageClient.create(bucketInfo)
    }

    def deleteBucket() {
        storageClient.list(bucketName).streamAll().forEach {it -> storageClient.delete(it.blobId)}
        storageClient.delete(bucketName)
    }

    def getAllBlobs() {
        return storageClient.list(bucketName).values
    }
}

It provides methods for creating and deleting buckets, but also for querying all objects (blobs) in the bucket, which are going to be used later.

Last step is to create a relevant bucket before the tests, and clean it up after they are executed.

@Autowired
StorageTestClient storageClient

...

def setup() {
    dbTestClient = new DbTestClient(template)
    stripeMock = new StripeMock(stripeServer)
    storageClient.createBucket()
}

def cleanup() {
    stripeServer.resetAll()
    storageClient.deleteBucket()
}

Test implementation

class ITestPictures extends IntegrationTestBase {

    def "should upload picture to GCS"() {
        given: 'There is a picture to be uploaded'
        def picture = new ClassPathResource("pictures/yosemite.png")

        when: 'Upload endpoint is called'
        def body = buildMultipartBody(picture)
        def result = restTemplate.postForEntity('/api/pictures', body, Unit.class)

        then: 'Response is successful'
        result.statusCode == OK

        and: 'Picture is uploaded to GCS'
        def blobs = storageClient.getAllBlobs()
        blobs.size() == 1
        blobs[0].blobId != null
        blobs[0].name == 'yosemite.png'
    }

    private def buildMultipartBody(Resource picture) {
        def body = new LinkedMultiValueMap<>()
        body.add("picture", new HttpEntity<>(picture))
        return body
    }
}

The end result is clean and concise. Using a multipart request body to send the actual file with the HTTP request allows for verifying whether or not it has been uploaded successfully to the Google Cloud Storage bucket.

Please bear in mind that the file pictures/yosemite.png has to be on the classpath, for instance in the src/itest/resources directory.

5. Sending Emails

Another widely adopted feature that can be looked at is email sending using Simple Mail Transfer Protocol (SMTP). This chapter will concentrate on presenting how it can be effectively tested.

Email sender

Below is an illustrative example implementation of the email sending component in Spring Boot.

data class Email(
    val sender: String,
    val recipient: String,
    val subject: String,
    val content: String
)

interface EmailService {
    fun sendEmail(email: Email)
}

@Service
class SmtpEmailService(private val javaMailSender: JavaMailSender) : EmailService {
    override fun sendEmail(email: Email) {
        val mimeMessage = javaMailSender.createMimeMessage()
        MimeMessageHelper(mimeMessage, true).apply {
            setText(email.content, true)
            setTo(email.recipient)
            setFrom(email.sender)
            setSubject(email.subject)
        }
        javaMailSender.send(mimeMessage)
    }
}

The Email class represents the email object with essential data. The aforementioned service uses JavaMailSender SDK for sending emails via SMTP. It has to be properly configured and Spring offers a convenient way of doing that, by specifying required configuration properties in the application.yml file.

 mail:
    host: smtp.gmail.com
    port: 587
    username: <username>
    password: <password>
    properties:
      mail.smtp.auth: true
      mail.smtp.starttls.enable: true

We need to declare additional dependencies for the application as well:

implementation("org.springframework.boot:spring-boot-starter-mail")
implementation("com.sun.mail:jakarta.mail:$jakartaVersion")

Integration Test

Testing scenario

An actual testing scenario could be straightforward:

  • There is an email to be sent
  • DefaultEmailService’s send method is called
  • Email is delivered to the particular recipient, identified by the email address
  • Verification that the delivered email contains all the requisite data

Test setup

To be able to perform the verification from the last bullet point of the testing scenario you have to inspect the data (emails) being sent over SMTP. One of the tools that allows that is GreenMail. It’s an open source, intuitive and easy-to-use test suite of email servers for testing purposes.

Let’s add it to our test suite:

itestImplementation("com.icegreen:greenmail:$greenmailVersion")
itestImplementation("com.icegreen:greenmail-junit5:$greenmailVersion")
static int greenMailPort = findAvailableTcpPort()
static String greenMailUser = UUID.randomUUID().toString()
static String greenMailPassword = UUID.randomUUID().toString()
static GreenMail greenMail

def setupSpec() {
    ...
    greenMail = new GreenMail(ServerSetupTest.SMTP.port(greenMailPort))
        .withConfiguration(
            GreenMailConfiguration.aConfig()
                .withUser(greenMailUser, greenMailPassword)
        )
}

def setup() {
    ...
    greenMail.start()
}

def cleanup() {
    ...
    greenMail.stop()
}

...

static class PropertyInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
    @Override
    void initialize(ConfigurableApplicationContext applicationContext) {
        String[] properties = [
            ...                    
            "greenmail.port=${greenMailPort}",
            "greenmail.user=${greenMailUser}",
            "greenmail.password=${greenMailPassword}"
        ]
...

Configuration:

 mail:
    host: localhost
    port: ${greenmail.port}
    username: ${greenmail.user}
    password: ${greenmail.password}
    properties:
      mail.smtp.auth: true
      mail.smtp.starttls.enable: true

The provided code exemplifies the instantiation and lifecycle management of the GreenMail object. Once it’s operational, you are able to adjust the application’s SMTP configuration on the itest profile, thereby enabling the utilisation of GreenMail in lieu of an actual SMTP server.

Test implementation

class ITestEmailService extends IntegrationTestBase {
    @Autowired
    EmailService emailService

    def "should send e-mail"() {
        given: 'There is an email to be sent'
        def email = new Email('anakin@skywalker.com', 'luke@skywalker.com', 'Luke!', 'I am your father')

        when: 'The email is sent'
        emailService.sendEmail(email)

        then: 'Greenmail receives one email message'
        def messages = greenMail.getReceivedMessages()
        messages.size() == 1
        with(messages.first()) {
            getHeader(it, 'From') == email.sender
            getHeader(it, 'To') == email.recipient
            getHeader(it, 'Subject') == email.subject
            def body = GreenMailUtil.getBody(it)
            body.contains(email.content)
        }
    }

    private static def getHeader(MimeMessage message, String header) {
        message.allHeaders.find { it.name == header }.value
    }
}

In the verification block of the above test, the GreenMail utility method getReceivedMessages is used. This method returns the array of MimeMessage objects, representing emails intercepted by GreenMail. To assert the presence of desired data within the email, the utility method GreenMailUtil.getBody() is invoked. Additionally, various email headers can be verified through this process.

6. AWS SQS queues

As an illustrative example of the AWS services, let us closely examine the Simple Queue Service (SQS). It can serve diverse purposes and constitutes a common integration point of many applications.

Email sending events

AWS SES enables clients to track emails sending at a granular level by setting up email sending events.  They can be published to various output streams but one of them is an SQS queue (via SNS).

Assume that your application needs to consume and handle those events.

Let’s write a simple consumer code.

SQSEmailEventsReceiver

Firstly, more dependencies are required to be defined:

implementation(platform("io.awspring.cloud:spring-cloud-aws-dependencies:$awsCloudVersion"))
implementation("io.awspring.cloud:spring-cloud-aws-starter-sqs")

Conveniently, nothing more is needed to implement a SQS message receiver in Spring Boot:

data class EmailEvent(
    val id: Long,
    val type: String
)

@Service
class SqsEmailEventReceiver {
    private companion object {
        val log: Logger = LoggerFactory.getLogger(SqsEmailEventReceiver::class.java)
    }

    @SqsListener(value = ["\${sqs.queue-name}"])
    fun handle(event: EmailEvent) {
        log.info("Handling email event: $event finished with success")
    }
}

EmailEvent class represents an event object from SES, in real life it’s usually much more complex, but you can use this simple representation for example purposes.

SqsEmailEventReceiver provides one method handle, decorated with a @SqsListener annotation which instructs Spring Boot to spawn a separate thread with a consumer of the queue passed as a parameter.

The handling logic is just logging the received event to make it simple.

sqs.queue-name property has to be defined in the application.yml file:

sqs:
  queue-name: email-events-queue

Integration Test

Testing scenario

Assume following testing scenario:

  • Email event is sent to the SQS queue
  • Event is being processed
  • Eventually the event is successfully consumed (it’s not visible in the queue anymore)

Test setup

When dealing with AWS services, it is advised to use Testcontainers LocalStack Module  which is a fully functional local AWS Cloud stack and has built-in support for SQS among others.

Let’s add a dependency to `localstack` module:

itestImplementation("org.testcontainers:localstack:$localstackVersion")

Now, we’re ready to spin up the LocalStack docker container with SQS support:

private static DockerImageName localstackImage = DockerImageName.parse("localstack/localstack")
public static LocalStackContainer localstack = new LocalStackContainer(localstackImage).withServices(SQS)

...

def setupSpec() {
    ...
    localstack.start()
}

def cleanupSpec() {
    ...
    localstack.stop()
}

As in most of the previous examples, the application needs to know that it should connect to the mocked SQS server in the itest scope. To do that, let’s override SqsAsyncClient bean definition to point to the LocalStack container. Add the following @TestConfiguration class to IntegrationTestBase:

@TestConfiguration
static class AwsITestConfiguration {

    @Bean
    @Primary
    SqsAsyncClient amazonSQSAsync() {
        return SqsAsyncClient.builder()
            .region(Region.of(localstack.getRegion()))
            .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(localstack.accessKey, localstack.secretKey)))
            .endpointOverride(localstack.getEndpointOverride(SQS))
            .build()
    }
}

What’s new in the example above is the AWS credentials and region that need to be specified. Luckily, localstack is equipped with suitable methods to retrieve those properties.

Last, but not least we need an utility component to facilitate interaction with the SQS queue. We might introduce SqsTestClient for that purpose:

@Component
class SqsTestClient {
    @Autowired
    private SqsAsyncClient sqsClient

    @Value("\${sqs.queue-name}") private String queueName

    def createQueue() {
        sqsClient.createQueue(CreateQueueRequest.builder().queueName(queueName).build() as CreateQueueRequest)
    }

    def sendEvent(String body) {
        sqsClient.sendMessage(getMessageRequest(body))
    }

    def processingEventsSize() {
        parseInt(getQueueAttributes().get("ApproximateNumberOfMessagesNotVisible"))
    }

    def isQueueEmpty() {
        def attributes = getQueueAttributes()
        return parseInt(attributes.get("ApproximateNumberOfMessagesNotVisible")) +
            parseInt(attributes.get("ApproximateNumberOfMessages")) +
            parseInt(attributes.get("ApproximateNumberOfMessagesDelayed")) == 0
    }

    private SendMessageRequest getMessageRequest(String body) {
        SendMessageRequest.builder().queueUrl(queueName).messageBody(body).build() as SendMessageRequest
    }

    private Map<String, String> getQueueAttributes() {
        def request = GetQueueAttributesRequest
            .builder()
            .queueUrl(queueName)
            .attributeNames(ALL)
            .build() as GetQueueAttributesRequest

        sqsClient.getQueueAttributes(request).get().attributesAsStrings()
    }
}

Presented class utilises the SqsAsyncClient component to communicate with SQS (remember that it is configured to be tied with the LocalStack container). AWS SDK can be used to implement methods convenient to use in the test implementation such as:

  • createQueue
  • isQueueEmpty
  • processingEventsSize

sqs.queue-name has to be defined in the application-itest.yml configuration file:

sqs:
  queue-name: itest-email-events-queue

Moreover, SqsTestClient needs to be injected in the IntegrationTestBase:

@Autowired
SqsTestClient sqsTestClient

That is all the setup necessary for employing the integration test for the SQS queue consumer.

Test implementation

class ITestSqsEmailEventReceiver extends IntegrationTestBase {

    def "should consume SQS event"() {
        given: 'There is an email event'
        String event = /{
            "id": 13,
            "type": "Marked as spam"
        }/

        when: 'An event is sent to the queue'
        sqsTestClient.sendEvent(event)

        then: 'Event is being processed'
        new PollingConditions(timeout: 5).eventually {
            sqsTestClient.processingEventsSize() == 1
        }

        and: 'Event is successfully consumed'
        new PollingConditions(timeout: 5).eventually {
            sqsTestClient.isQueueEmpty()
        }
    }
}

As always, the exact implementation is straightforward when all of the necessary pieces are in place. Following the event being sent to the queue, we can make an assertion that it is successfully consumed by verifying if the queue is eventually empty – this means that no exception has been thrown in the process. Of course, if the handling logic was more sophisticated, you would be able to verify other conditions related to the database or application state.

7. Optimisations

In the preceding chapters of this article, we have examined numerous patterns that could form a robust foundation for integration testing. Nevertheless, there remains a lot of room for improvement. As you probably noticed, we didn’t avoid certain logical duplicates and boilerplate code. Among the repetitive tasks, the lifecycle management of our mocks stands out, as it is consistently executed in a similar manner for the majority of them. Furthermore, evaluating the overall execution time of the integration test suite reveals potential for optimization

BUILD SUCCESSFUL in 2m
6 actionable tasks: 3 executed, 3 up-to-date

real    2m0.707s
user    0m1.280s
sys      0m0.257s

A total of 12 test cases distributed across 8 test classes were executed, with an allotted time of two minutes. This feels like an amount to improve.

Let us explore one of the potential ways to address the aforementioned challenges.

Spock Extensions

Spock comes with a powerful extension mechanism, which allows you to hook into a specification’s lifecycle to enrich or alter its behaviour. While there exists a range of built-in extensions with valuable functionalities, this chapter will focus on Spock’s custom extensions.

Spock offers two distinct types of custom extensions: global extensions and annotation-driven local extensions – we will make use of both of them.

For a comprehensive understanding of Spock extensions, the documentation provides further insights, all you should know for now is that we need them to run custom code for our mocks in the specific moments during the integration tests suite run.

Objectives

There is multiple objectives that we will try to achieve:

  • Improve tests execution time
  • Make mock services easily pluggable
  • Mitigate redundancy and boilerplate code
  • Consolidate mock configuration within a user-friendly framework
  • Decouple tests logic from the configuration

Implementation

To achieve the aforementioned objectives, it would be a good idea to create a common interface (Groovy trait) for all the mocks in our suite.

trait Mock {
    abstract void start()

    abstract void stop()

    void cleanup() {

    }

    /**
     * Inlined properties that should be registered in the application context
     * format: ["propertyA=valueA", "propertyB=valueB"]
     */
    String[] propertiesToRegister() {
        return []
    }
}

Each mock within the suite has to implement this interface. start, stop and cleanup methods are simple lifecycle management hooks. propertiesToRegister defines the test properties that each mock exposes and need to be added to application context. It is noteworthy that both the cleanup and propertiesToRegister methods have empty default implementation.

Let’s examine the MailMock as an illustrative example of the implementation for the Mock interface.

class MailMock implements Mock {

    static int greenMailPort = findAvailableTcpPort()
    static String greenMailUser = UUID.randomUUID().toString()
    static String greenMailPassword = UUID.randomUUID().toString()
    static GreenMail greenMail

    @Override
    void start() {
        greenMail = new GreenMail(ServerSetupTest.SMTP.port(greenMailPort))
                .withConfiguration(
                        GreenMailConfiguration.aConfig()
                                .withUser(greenMailUser, greenMailPassword)
                )
        greenMail.start()
    }

    @Override
    void stop() {
        greenMail.stop()
    }

    @Override
    String[] propertiesToRegister() {
        return [
                "greenmail.port=${greenMailPort}",
                "greenmail.user=${greenMailUser}",
                "greenmail.password=${greenMailPassword}"
        ]
    }

    static MimeMessage[] getReceivedMessages() {
        return greenMail.getReceivedMessages()
    }
}

Other examples can be found in this package.

Next step is to create an utility class that will gather all mocks and could be used to manage their lifecycle collectively.

class MockEnvironment {

    private static boolean started

    private static List<Mock> mocks

    static init(List<Mock> mocks) {
        this.mocks = mocks
    }

    static start() {
        if (!started) {
            started = true
            mocks.each { it.start() }
        }
    }

    static stop() {
        if (started) {
            mocks.each { it.stop() }
        }
    }

    static cleanup() {
        mocks.each { it.cleanup() }
    }

    /**
     * Could be added to application context like this:
     * @Override
     * void initialize(ConfigurableApplicationContext applicationContext) {
     *      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
     *          applicationContext,
     *          MockEnvironment.propertiesToRegister()
     *      )
     * }
     */
    static String[] propertiesToRegister() {
        mocks.collect {it.propertiesToRegister() }.flatten()
    }
}

It empowers the user to initialise a list of mocks to be managed in the application.

Annotation driven local extension

Let’s create a Spock extension dedicated to initiating all mocks and registering essential properties prior to the first test execution.

To make mocks easily pluggable we can take advantage of a custom annotation.

@Retention(RetentionPolicy.RUNTIME)
@Target([ElementType.TYPE, ElementType.FIELD, ElementType.METHOD])
@ExtensionAnnotation(MocksExtension.class)
@interface Mocks {
    Service[] services()
}
enum Service {
    GCS(new GcsMock()),
    KAFKA(new KafkaMock()),
    MAIL(new MailMock()),
    SQS(new SqsMock()),
    STRIPE(new StripeMock())

    public final Mock service

    private Service(Mock service) {
        this.service = service
    }
}

Service enumeration represents each of the mocks, while the Mocks annotation, designed with flexibility in mind, accepts a singular parameter – services, configuring which services are going to be used.

Finally it’s time to implement the actual extension:

class MocksExtension implements IAnnotationDrivenExtension<Mocks> {
    @Override
    void visitSpecAnnotation(Mocks annotation, SpecInfo spec) {
        MockEnvironment.init(annotation.services().collect { it.service })
        spec.addSharedInitializerInterceptor(new IMethodInterceptor() {
            @Override
            void intercept(IMethodInvocation invocation) throws Throwable {
                MockEnvironment.start()
            }
        })
        spec.addCleanupInterceptor(new IMethodInterceptor() {
            @Override
            void intercept(IMethodInvocation invocation) throws Throwable {
                MockEnvironment.cleanup()
            }
        })
    }
}

The extension leverages two pivotal hooks: addSharedInitializerInterceptor executed preceding the shared initializer of the annotated specification, and addCleanupInterceptor, executed subsequent to the cleanup phase of the specification. You can read more about Spock interceptors in the documentation.

Furthermore, the initialization of all mocks is orchestrated through the MockEnvironment.init() method, with the services parameter extracted from the Mocks annotation.

That is how the extension can be used in practice:

@Mocks(services = [GCS, KAFKA, MAIL, SQS, STRIPE] )
class IntegrationTestBase extends Specification {
...

It is impressive that this single line of code is everything required for managing the lifecycle of all of the services that we need for the tests.

Global extension

Finally, and of no lesser importance, upon the completion of all tests, a cleanup of the test infrastructure should be performed. As annotation-driven extensions lack a dedicated hook executed after all tests, we shall resort to using a global extension for this purpose.

class MockEnvironmentGlobalExtension implements IGlobalExtension {
    @Override
    void stop() {
        MockEnvironment.stop()
    }
}

The stop() method precisely fulfils the requirement, ensuring all mocks are stopped upon the conclusion of the entire test suite.

To activate the global extension we need to add its name to META-INF/services/ org.spockframework.runtime.extension.IGlobalExtension file:

pl.kurczyna.springit.extensions.MockEnvironmentGlobalExtension

Conclusion

Having both extensions in place, the final form of the IntegrationTestBase class is notably more concise:

@SpringBootTest(webEnvironment = RANDOM_PORT)
@ActiveProfiles('itest')
@Import(GcsITestConfiguration)
@ContextConfiguration(initializers = PropertyInitializer)
@Mocks(services = [GCS, KAFKA, MAIL, SQS, STRIPE] )
class IntegrationTestBase extends Specification {

    @LocalServerPort
    int appPort

    @Autowired
    TestRestTemplate restTemplate

    @Autowired
    KafkaTemplate<String, UserEvent> kafkaUsersProducer

    @Autowired
    StorageTestClient storageClient

    @Autowired
    SqsTestClient sqsTestClient

    @Autowired
    DbTestClient dbTestClient

    def setup() {
        sqsTestClient.createQueue()
    }

    static class PropertyInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
        @Override
        void initialize(ConfigurableApplicationContext applicationContext) {
            TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
                    applicationContext,
                    MockEnvironment.propertiesToRegister()
            )
        }
    }

    @TestConfiguration
    static class AwsITestConfiguration {

        @Bean
        @Primary
        SqsAsyncClient amazonSQSAsync() {
            return SqsAsyncClient.builder()
                    .region(Region.of(SqsMock.getRegion()))
                    .credentialsProvider(SqsMock.getCredentials())
                    .endpointOverride(SqsMock.getEndpointOverride())
                    .build()
        }
    }
}

Additionally, because of the fact that the mocks are started only once, before the first test is executed, the total execution time decreased significantly as well:

BUILD SUCCESSFUL in 39s
6 actionable tasks: 3 executed, 3 up-to-date

real  0m39.873s
user  0m1.228s
sys   0m0.223s

Ultimately, we have effectively realised all predefined objectives. The Spock extensions have proven to be a robust mechanism for systematically organising and consolidating the integration tests setup.

8. Afterword

This article concluded the comprehensive guide to integration testing of JVM backend services. with a predominant focus on Spring Boot, Kotlin, and Spock. In this part particularly, we looked at the more complex integration patterns, including Kafka, email sending, Google Cloud Storage and AWS Simple Queue Service.

Finally, we wrapped up with the chapter that introduced you to the Spock Extensions, effectively encapsulating the test infrastructure, streamlining the setup, and significantly improving test execution time.

I firmly believe that by adopting the approach outlined in this and preceding part of the article, one can develop a robust and highly efficient integration testing framework. This framework can be seamlessly extended to incorporate plentiful external services beyond those aforementioned.

Useful links

Written by Patryk Kurczyna
Kotlin/Java Developer
Published December 12, 2023