Setup Local AWS SQS Service with Elastic MQ and Spring Cloud


Introduction

Amazon Simple Queue Service (SQS) is a service on Amazon Web Services that provides a scalable message queuing service for loosely-coupled and reliable communication among distributed software components and microservices. In this article, we will setup a local Elastic MQ service, which will simulate a SQS service and use Spring Cloud to push messages to that queue. With a minimal amount of configuration, this service can be deployed to a EC2 instance and integrate with an actual SQS service.

We will be focusing on the standard SQS queue. The other option, a FIFO Queue is limited to 300TPS(300 transactions per second) and is not a valid use-case for many high-throughput applications.

Characteristics of a Standard SQS Queue


Fully-managed, high throughput messaging queue. Standard queues have nearly-unlimited transactions per second (TPS).

Designed for concurrency with multiple message producers and consumers. Multiple parts of your system can send or receive messages at the same time.

Add Dependencies to Gradle

First let's add the dependencies. We will be using the standard dependency here. Add the following dependencies to your build.gradle.

build.gradle


dependencies {
    compile('org.elasticmq:elasticmq-rest-sqs_2.11:0.10.1')
    compile group: 'org.springframework.cloud', name: 'spring-cloud-aws-messaging', version: '1.1.3.RELEASE'
    compile group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.49'
    
    testCompile('org.elasticmq:elasticmq-rest-sqs_2.11:0.10.1')
}    



Set up our Properties

Next let's set up our properties to configure our local ElasticMQ SQS service and Spring Cloud.

sqs-services.yaml

awsConfig:
  accessKey: x
  secretKey: x
  sqsQueueName: sqs-queue-name

queueBuffer:
  maxBatchOpenMs: 200  #Maximum amount of time, in milliseconds, that an outgoing call waits for other calls of the same type to batch with
  maxBatchSize: 10  #The maximum number of messages that will be batched together in a single batch request
  maxInflightOutboundBatches: 5  #The maximum number of active receive batches that can be processed at the same time

elasticMqLocalSqsUri:
  scheme: http
  host: localhost
  path:
  port: 9324

awsSqsUri:
  scheme: https
  host: sqs.us-east-1.amazonaws.com
  path: /1234567890/
  port: 80

echoSqsMessagesLocal: false

Let's also add some flags to our application properties.

sqs-services.yaml

#Local SQS Config
aws.local.sqs.localElasticMq.enable=true
aws.local.sqs.localElasticMq.startServer=false



Create POJOs to capture our Configuration Properties

Next, since we are working with a lot of properties, let's create a way to deal with them in a manageable way. We will create POJOs(Plain old Java Objects) that will contain various properties from our configuration. Here is the first POJO to capture Queue Buffer Mapped Properties:

QueueBufferMappedProperties.java

public class QueueBufferMappedProperties {

    /*
        The maximum amount of time, in milliseconds, that an outgoing call waits for other calls of the same type to batch with.
        The higher the setting, the fewer batches are required to perform the same amount of work.
        Of course, the higher the setting, the more the first call in a batch has to spend waiting.
        If this parameter is set to zero, submitted requests do not wait for other requests, effectively disabling batching.
        The default value of this setting is 200 milliseconds.
     */
    private Long maxBatchOpenMs;

    /*
        The maximum number of messages that will be batched together in a single batch request.
        The higher the setting, the fewer batches will be required to carry out the same number of requests.
        The default value of this setting is 10 requests per batch, which is also the maximum batch size currently allowed by Amazon SQS.
     */
    private Integer maxBatchSize;

    /*
        The maximum number of active outbound batches that can be processed at the same time.
        The higher the setting, the faster outbound batches can be sent (subject to other limits, such as CPU or bandwidth).
        The higher the setting, the more threads are consumed by the AmazonSQSBufferedAsyncClient. The default value is 5 batches.
     */
    private Integer maxInflightOutboundBatches;

    public Long getMaxBatchOpenMs() {
        return maxBatchOpenMs;
    }

    public Integer getMaxBatchSize() {
        return maxBatchSize;
    }

    public Integer getMaxInflightOutboundBatches() {
        return maxInflightOutboundBatches;
    }

    public void setMaxBatchOpenMs(Long maxBatchOpenMs) {
        this.maxBatchOpenMs = maxBatchOpenMs;
    }

    public void setMaxBatchSize(Integer maxBatchSize) {
        this.maxBatchSize = maxBatchSize;
    }

    public void setMaxInflightOutboundBatches(Integer maxInflightOutboundBatches) {
        this.maxInflightOutboundBatches = maxInflightOutboundBatches;
    }
}

Next, here is a POJO to capture the URI of our local Elastic MQ Service:

LocalServiceUri.java

public class LocalServiceUri {

    private String scheme;
    private String host;
    private String path;
    private String port;

    public String getScheme() {
        return scheme;
    }

    public String getHost() {
        return host;
    }

    public String getPath() {
        return path;
    }

    public String getPort() {
        return port;
    }

    public void setScheme(String scheme) {
        this.scheme = scheme;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public void setPath(String path) {
        this.path = path;
    }

    public void setPort(String port) {
        this.port = port;
    }
}

Next, here is a POJO to capture various AWS config properties:

AwsMappedProperties.java

public class AwsMappedProperties {

    private String accessKey;
    private String secretKey;
    private String sqsQueueName;

    public String getAccessKey() {
        return accessKey;
    }

    public String getSecretKey() {
        return secretKey;
    }

    public String getSqsQueueName() {
        return sqsQueueName;
    }

    public void setAccessKey(String accessKey) {
        this.accessKey = accessKey;
    }

    public void setSecretKey(String secretKey) {
        this.secretKey = secretKey;
    }

    public void setSqsQueueName(String sqsQueueName) {
        this.sqsQueueName = sqsQueueName;
    }

}



Enable Configuration Properties

Next up, let's configure a Spring Configuration class which will automatically read in parameters from our configuration YAML into our POJO objects.

AwsMappedProperties.java

@Configuration
@EnableConfigurationProperties
@ConfigurationProperties(locations = "classpath:sqs-services.yaml")
public class SqsConfigMappingProperties {

    private LocalServiceUri elasticMqLocalSqsUri;
    private LocalServiceUri awsSqsUri;
    private AwsMappedProperties awsConfig;
    private QueueBufferMappedProperties queueBuffer;

    private Boolean echoSqsMessagesLocal;

    public LocalServiceUri getElasticMqLocalSqsUri() {
        return elasticMqLocalSqsUri;
    }

    public LocalServiceUri getAwsSqsUri() {
        return awsSqsUri;
    }

    public AwsMappedProperties getAwsConfig() {
        return awsConfig;
    }

    public QueueBufferMappedProperties getQueueBuffer() {
        return queueBuffer;
    }

    public Boolean getEchoSqsMessagesLocal() {
        return echoSqsMessagesLocal;
    }

    public void setElasticMqLocalSqsUri(LocalServiceUri elasticMqLocalSqsUri) {
        this.elasticMqLocalSqsUri = elasticMqLocalSqsUri;
    }

    public void setAwsSqsUri(LocalServiceUri awsSqsUri) {
        this.awsSqsUri = awsSqsUri;
    }

    public void setAwsConfig(AwsMappedProperties awsConfig) {
        this.awsConfig = awsConfig;
    }

    public void setQueueBuffer(QueueBufferMappedProperties queueBuffer) {
        this.queueBuffer = queueBuffer;
    }

    public void setEchoSqsMessagesLocal(Boolean echoSqsMessagesLocal) {
        this.echoSqsMessagesLocal = echoSqsMessagesLocal;
    }
}



Create AWS Spring Config

We will set up our SQS queue such that it will be able to use ElasticMQ locally and AWS SQS when deployed.

AwsMappedProperties.java

@Configuration
@EnableConfigurationProperties(SqsConfigMappingProperties.class)
public class AwsConfig {

    @Value("${aws.local.sqs.localElasticMq.enable}")
    Boolean enableLocalElasticMq;

    @Value("${aws.local.sqs.localElasticMq.startServer}")
    Boolean startLocalElasticMq;

    @Autowired
    SqsConfigMappingProperties sqsConfigMappingProperties;

    @Bean
    public UriComponents elasticMqLocalSqsUri() {

        LocalServiceUri elasticMqLocalSqsUri = sqsConfigMappingProperties.getElasticMqLocalSqsUri();

        return UriComponentsBuilder.newInstance()
                .scheme(elasticMqLocalSqsUri.getScheme())
                .host(elasticMqLocalSqsUri.getHost())
                .port(elasticMqLocalSqsUri.getPort())
                .build()
                .encode();
    }

    @Bean
    public SQSRestServer sqsRestServer(UriComponents elasticMqLocalSqsUri) {
        SQSRestServer sqsRestServer = SQSRestServerBuilder
                .withPort(Integer.valueOf(elasticMqLocalSqsUri.getPort()))
                .withInterface(elasticMqLocalSqsUri.getHost())
                .start();

        return sqsRestServer;
    }

    @Lazy
    @Bean(name = "amazonSqsLocal")
    @DependsOn("sqsRestServer")
    @ConditionalOnExpression("${aws.local.sqs.localElasticMq.enable}")
    public AmazonSQSAsync amazonSqsLocal(AWSCredentials amazonAWSCredentials) {

        AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(amazonAWSCredentials);
        awsSQSAsyncClient.setEndpoint(createURI(sqsConfigMappingProperties.getElasticMqLocalSqsUri()));
        awsSQSAsyncClient.createQueue(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());

        QueueBufferMappedProperties queueBufferMappedProperties = sqsConfigMappingProperties.getQueueBuffer();
        QueueBufferConfig config = new QueueBufferConfig()
                .withMaxBatchOpenMs(queueBufferMappedProperties.getMaxBatchOpenMs())
                .withMaxBatchSize(queueBufferMappedProperties.getMaxBatchSize())
                .withMaxInflightOutboundBatches(queueBufferMappedProperties.getMaxInflightOutboundBatches());

        AmazonSQSBufferedAsyncClient amazonSQSBufferedAsyncClient = new AmazonSQSBufferedAsyncClient(awsSQSAsyncClient,config);



        return amazonSQSBufferedAsyncClient;
    }

    @Lazy
    @Bean(name = "amazonSqs")
    @ConditionalOnExpression("!${aws.local.sqs.localElasticMq.enable}")
    public AmazonSQSAsync amazonSqs(AWSCredentials amazonAWSCredentials) {

        AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(amazonAWSCredentials);
        awsSQSAsyncClient.setEndpoint(createURI(sqsConfigMappingProperties.getAwsSqsUri()));
        awsSQSAsyncClient.createQueue(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());

        QueueBufferMappedProperties queueBufferMappedProperties = sqsConfigMappingProperties.getQueueBuffer();
        QueueBufferConfig config = new QueueBufferConfig()
                .withMaxBatchOpenMs(queueBufferMappedProperties.getMaxBatchOpenMs())
                .withMaxBatchSize(queueBufferMappedProperties.getMaxBatchSize())
                .withMaxInflightOutboundBatches(queueBufferMappedProperties.getMaxInflightOutboundBatches());

        AmazonSQSBufferedAsyncClient amazonSQSBufferedAsyncClient = new AmazonSQSBufferedAsyncClient(awsSQSAsyncClient,config);

        return amazonSQSBufferedAsyncClient;
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSqs, AmazonSQSAsync amazonSqsLocal, SQSRestServer sqsRestServer) {
        QueueMessagingTemplate queueMessagingTemplate;

        if(enableLocalElasticMq)
            queueMessagingTemplate = new QueueMessagingTemplate(amazonSqsLocal);
        else
            queueMessagingTemplate = new QueueMessagingTemplate(amazonSqs);

        queueMessagingTemplate.setDefaultDestinationName(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName());

        if(!startLocalElasticMq)
            sqsRestServer.stopAndWait();

        return queueMessagingTemplate;
    }

    @Bean
    public AWSCredentials amazonAWSCredentials() {
        if ("local".equals(ApplicationInfo.getEnvironment())) {
            return new BasicAWSCredentials(sqsConfigMappingProperties.getAwsConfig().getAccessKey(),
                    sqsConfigMappingProperties.getAwsConfig().getSecretKey());
        }

        return new DefaultAWSCredentialsProviderChain().getCredentials();
    }

    private static String createURI(LocalServiceUri localServiceUri) {

        return UriComponentsBuilder.newInstance()
                .scheme(localServiceUri.getScheme())
                .host(localServiceUri.getHost())
                .port(localServiceUri.getPort())
                .path(localServiceUri.getPath())
                .build()
                .encode().toUriString();
    }
}



SQS Service Implementation



SqsService.java

public interface SqsService {
    void sendSqsMessage(DomainObject domainObject);
}



SqsService.java

@Service
@EnableConfigurationProperties(SqsConfigMappingProperties.class)
public class SqsServiceImpl implements SqsService {

    @Autowired
    SqsConfigMappingProperties sqsConfigMappingProperties;

    @Autowired
    QueueMessagingTemplate queueMessagingTemplate;

    @Override
    public void sendSqsMessage(DomainObject domainObject) {
        queueMessagingTemplate.convertAndSend(sqsConfigMappingProperties.getAwsConfig().getSqsQueueName(), pricingChanges);
    }
}



Using the SQS Service in a Controller

Using the SQS Service in a controller to make a REST API call to put a serialized JSON message in the queue.

DomainObjectIngestController.java

@RestController
@RequestMapping("/domainObject")
public class DomainObjectIngestController {

    @Autowired
    SqsService sqsService;

    /**
     *
     * @return version information about the application.
     */
    @RequestMapping(value = "", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity postDomainObject(@RequestBody DomainObject input) {

        sqsService.sendSqsMessage(input);
        return new ResponseEntity<>(HttpStatus.NO_CONTENT);
    }

}



Testing your Service



SqsServiceImplTest.java

@Profile("local")
public class SqsServiceImplTest {

    private static final String QUEUE_NAME = "sqs-queue-name";
    private static final int SQS_PORT = 9324;
    private static final String SQS_HOSTNAME = "localhost";

    private static final String EXPECTED_SELLING_PRICE = "100";

    private SQSRestServer sqsRestServer;
    private SqsServiceImpl classUnderTest;
    private QueueMessagingTemplate queueMessagingTemplate;
    private SqsConfigMappingProperties sqsMappingProperties;

    @Before
    public void setUp() {
        //Run local ElasticMQ SQS queue
        try {
            sqsRestServer = SQSRestServerBuilder
                    .withPort(SQS_PORT)
                    .withInterface(SQS_HOSTNAME)
                    .start();
        } catch (BindFailedException e) {
            e.printStackTrace();
        }

        AmazonSQSAsyncClient awsSQSAsyncClient = new AmazonSQSAsyncClient(new BasicAWSCredentials("x", "x"));
        awsSQSAsyncClient.setEndpoint("http://" + SQS_HOSTNAME + ":" + SQS_PORT);
        awsSQSAsyncClient.createQueue(QUEUE_NAME);

        SqsConfigMappingProperties sqsMappingProperties = new SqsConfigMappingProperties();
        sqsMappingProperties.setAwsConfig(buildAwsConfig());

        queueMessagingTemplate = new QueueMessagingTemplate(awsSQSAsyncClient);
        queueMessagingTemplate.setDefaultDestinationName(QUEUE_NAME);
        classUnderTest = new SqsServiceImpl();

        ReflectionTestUtils.setField(classUnderTest, "sqsConfigMappingProperties", sqsMappingProperties, SqsConfigMappingProperties.class);
        ReflectionTestUtils.setField(classUnderTest, "queueMessagingTemplate", queueMessagingTemplate, QueueMessagingTemplate.class);
    }

    @After
    public void tearDown() throws Exception {
        if(sqsRestServer != null)
            sqsRestServer.stopAndWait();
    }

    @Test
    public void givenValidPriceChange_whenSendSqsMsg_theVerifyReceivedMsg() throws Exception {

        classUnderTest.sendSqsMessage(buildPricingChange());
        PricingChange actualResponse = queueMessagingTemplate.receiveAndConvert(QUEUE_NAME,PricingChange.class);
        assertEquals(EXPECTED_SELLING_PRICE, actualResponse.getSellingPrice().getAmount());
    }

    public AwsMappedProperties buildAwsConfig() {

        AwsMappedProperties awsConfig = new AwsMappedProperties();
        awsConfig.setSqsQueueName(QUEUE_NAME);

        return awsConfig;
    }

}

Popular posts from this blog

How to set up a SQL Server 2008 Local Database

Change Port on a Spring Boot Application when using intelliJ

Spring Boot Internationalization with Default Locale for Message Strings