Amazon SNS Java Messaging Library — Technical Guide
Architecture Overview
The library provides an asynchronous, batched messaging client for Amazon SNS, supporting both AWS SDK v1 and v2. It is organized as a multi-module Maven project:
| Module | Artifact | Purpose |
|---|---|---|
amazon-sns-java-messaging-lib-template |
(internal) | SDK-agnostic core: batching, queuing, threading, metrics |
amazon-sns-java-messaging-lib-v1 |
amazon-sns-java-messaging-lib-v1 |
AWS SDK v1 implementation (AmazonSNS client) |
amazon-sns-java-messaging-lib-v2 |
amazon-sns-java-messaging-lib-v2 |
AWS SDK v2 implementation (SnsClient) |
Core Components
┌────────────────────────────────────────────────────────────┐
│ AmazonSnsTemplate<E> │
├────────────────────────────────────────────────────────────┤
│ ┌───────────────────────┐ ┌─────────────────────────┐ │
│ │ AmazonSnsProducer<E> │ │ AmazonSnsConsumer<R,O> │ │
│ │ (AbstractProducer) │ │ (AbstractConsumer) │ │
│ │ │ │ │ │
│ │ - BlockingQueue │ │ - ScheduledExecutor │ │
│ │ - PendingRequests │ │ - Batching Logic │ │
│ └──────────┬────────────┘ └───────────┬─────────────┘ │
│ │ │ │
│ send(E) publishBatch(...) │
└─────────────┼─────────────────────────────┼────────────────┘
│ │
▼ ▼
┌───────────────────────────────────────────┐
│ Amazon SNS (v1/v2) │
└───────────────────────────────────────────┘
AmazonSnsTemplate— Main entry point. Created via a fluent builder (AmazonSnsTemplate.builder(snsClient, topicProperty)).AmazonSnsProducer— Accepts messages into aBlockingQueue, tracks pending futures, and returnsListenableFutureresults.AmazonSnsConsumer— Scheduled drainer that pulls messages from the queue atlingerintervals, batches them (respecting count and 256KB size limits), and publishes via the SDK’spublishBatchAPI.
Batching Behavior
Messages are accumulated in a BlockingQueue and drained periodically by a scheduled executor.
- Linger: Time (ms) to wait before flushing the batch. Resets on each new message arrival.
- Max batch size: Maximum number of messages per
publishBatchcall. - 256KB limit: Each batch request must not exceed the SNS payload limit. Messages exceeding 256KB individually throw
MaximumAllowedMessageException. - Memory: The buffer stores up to
maximumPoolSize × maxBatchSizemessages internally (backed by theBlockingQueue).
For FIFO topics, messages are published synchronously on a single-threaded executor to preserve ordering. For standard topics, publishing is asynchronous via a multi-threaded executor.
Message Flow
- User calls
template.send(RequestEntry<E>) - Producer serializes the message payload to JSON (via Jackson
ObjectMapper) - Producer enqueues the serialized entry into a
BlockingQueueand registers aListenableFuture - Consumer’s scheduled task drains the queue at
lingerintervals, building aPublishBatchRequest - Consumer calls
publishBatch()on the SNS client - On success: individual
ResponseSuccessEntryresults are matched back to futures by message ID - On failure:
ResponseFailEntryobjects complete the corresponding futures with error details
Dependencies
Template Module (shared)
<dependencies>
<dependency>org.slf4j:slf4j-api:2.0.6</dependency>
<dependency>org.apache.commons:commons-collections4:4.5.0</dependency>
<dependency>org.apache.commons:commons-lang3:3.20.0</dependency>
<dependency>com.fasterxml.jackson.core:jackson-databind:2.16.1</dependency>
<dependency>io.micrometer:micrometer-core:1.16.3</dependency>
<dependency>org.projectlombok:lombok:1.18.42 (provided)</dependency>
</dependencies>
AWS SDK v1 Module
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
<version>1.12.661</version>
</dependency>
AWS SDK v2 Module
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
<version>2.20.162</version>
</dependency>
Test Dependencies
<dependency>org.junit.jupiter:junit-jupiter:5.10.2 (test)</dependency>
<dependency>org.mockito:mockito-core:4.11.0 (test)</dependency>
<dependency>org.awaitility:awaitility:4.2.2 (test)</dependency>
<dependency>org.assertj:assertj-core:3.24.2 (test)</dependency>
<dependency>org.testcontainers:localstack:1.20.4 (test)</dependency>
Configuration Reference
TopicProperty
| Property | Type | Default | Description |
|---|---|---|---|
fifo |
boolean |
false |
Whether the SNS topic is FIFO |
topicArn |
String |
— | The SNS topic ARN |
maximumPoolSize |
int |
— | Max threads for the producer pool |
maxBatchSize |
int |
— | Max messages per batch request |
linger |
long (ms) |
— | Time to wait before flushing a batch |
Note: The in-memory buffer size = maximumPoolSize × maxBatchSize. Large values consume proportionally more memory.
Usage Examples
1. Setup with Builder (Recommended)
// For AWS SDK v1 — AmazonSNS client
// For AWS SDK v2 — SnsClient
TopicProperty topicProperty = TopicProperty.builder()
.fifo(false)
.linger(100L)
.maxBatchSize(10)
.maximumPoolSize(5)
.topicArn("arn:aws:sns:us-east-2:000000000000:topic")
.build();
AmazonSnsTemplate<MyMessage> template = AmazonSnsTemplate.builder(snsClient, topicProperty)
.meterRegistry(new SimpleMeterRegistry())
.topicRequests(new RingBufferBlockingQueue<>(1024))
.build();
2. Sending a Standard Message
template.send(
RequestEntry.<MyMessage>builder()
.withValue(new MyMessage("hello"))
.withMessageHeaders(Map.of("source", "app-1"))
.build()
);
3. Sending a FIFO Message
template.send(
RequestEntry.<MyMessage>builder()
.withValue(new MyMessage("ordered-msg"))
.withGroupId("my-group-id")
.withDeduplicationId(UUID.randomUUID().toString())
.build()
);
4. Async Callbacks
template.send(requestEntry)
.addCallback(
success -> log.info("Sent: {}", success.getMessageId()),
failure -> log.error("Failed: {} [{}]", failure.getMessage(), failure.getCode())
);
5. Await Completion and Shutdown
template.send(requestEntry);
template.await().thenRun(template::shutdown).join();
6. Custom ObjectMapper and BlockingQueue
AmazonSnsTemplate<MyMessage> template = new AmazonSnsTemplate<>(
amazonSNS,
topicProperty,
new LinkedBlockingQueue<>(100),
new ObjectMapper()
);
Metrics (Micrometer)
The library integrates with Micrometer and records the following metrics when a MeterRegistry is provided via the builder’s .meterRegistry(registry).
SNS Publish Metrics
Tags: topic = <topicArn>
| Metric Name | Type | Description | Config |
|---|---|---|---|
sns.publish.attempts |
Counter |
Total number of SNS PublishBatch calls attempted | — |
sns.publish.success |
Counter |
Individual SNS messages acknowledged as successful | — |
sns.publish.failure |
Counter |
Individual SNS messages that failed | Dynamic tags: error_code, error_type |
sns.publish.duration |
Timer |
End-to-end latency of SNS PublishBatch calls | Percentiles: 0.5, 0.95, 0.99 |
sns.publish.batch.size |
DistributionSummary |
Number of entries per SNS PublishBatch request | — |
sns.publish.inflight |
Gauge |
PublishBatches currently in progress | Backed by AtomicInteger |
The sns.publish.failure counter is created dynamically with additional error_code (AWS error code string) and error_type (amazon_service_exception or unknown) tags.
Blocking Queue Metrics
Tags: name = <queueName>
| Metric Name | Type | Description | Config |
|---|---|---|---|
blocking.queue.puts.total |
Counter |
Total number of successful put operations | — |
blocking.queue.puts.failed |
Counter |
Total number of put operations that threw an exception | — |
blocking.queue.put.duration |
Timer |
Latency of put operations (including wait time when queue is full) | Percentile histogram |
blocking.queue.takes.total |
Counter |
Total number of successful take operations | — |
blocking.queue.takes.failed |
Counter |
Total number of take operations that threw an exception | — |
blocking.queue.take.duration |
Timer |
Latency of take operations (including wait time when queue is empty) | Percentile histogram |
blocking.queue.size |
Gauge |
Current number of elements in the queue | Calls delegate.size() |
Executor Metrics
Tags: name = <executorName>
| Metric Name | Type | Description | Config |
|---|---|---|---|
executor.active |
Gauge |
Number of tasks currently being executed by pool threads | Backed by AtomicInteger |
executor.tasks.succeeded |
Counter |
Total number of tasks that completed without throwing an exception | — |
executor.tasks.failed |
Counter |
Total number of tasks that completed by throwing an exception | — |
executor.task.duration |
Timer |
Wall-clock duration of each task execution | — |
Threading Model
- Standard topics: Uses
AmazonSnsThreadPoolExecutorwithSynchronousQueue, zero core threads, andBlockingSubmissionPolicy(30s blocking timeout). Threads are created on demand. - FIFO topics: Single-threaded executor to guarantee message ordering.
- Consumer scheduler: A
ScheduledExecutorServicewith a single daemon thread drains the queue at eachlingerinterval.
Exception Handling
| Exception | Condition |
|---|---|
MaximumAllowedMessageException |
A single message exceeds the 256KB SNS payload limit |
SDK exceptions (AmazonServiceException / AwsServiceException) |
Service-side errors during publishBatch |
SDK exceptions (AmazonClientException / AwsClientException) |
Client-side errors (network, serialization) |
Failed messages are delivered to the failure callback with:
messageId— the original request IDcode— the error codemessage— error descriptionsenderFault— whether the error is a client or server fault
Testing
# Run unit tests
mvn test
# Run integration tests (requires Docker for LocalStack)
mvn verify -P integration-test
The integration tests use Testcontainers with localstack/localstack:3.4.0 to spin up real SNS and SQS services. Messages are verified by subscribing an SQS queue to the SNS topic and polling for delivery.