Skip to main content
Version: 1.0.0-beta

Introduction

Easy RMQ is a Rust AMQP library with connection pool, publisher, subscriber, and dependency injection support. This library simplifies working with RabbitMQ in Rust applications by providing a clean, type-safe API with powerful features out of the box.

Quick Overview

🚀 Why Easy RMQ?

  • Simple API: Easy-to-use publisher and subscriber patterns
  • Auto Setup: Automatic creation of exchanges, queues, and bindings
  • Connection Pooling: Built-in pool using deadpool for efficiency
  • Advanced Features: Retry, concurrency, single active consumer
  • Distributed Tracing: Built-in trace ID generation with OTel support
  • Type Safe: Strong error handling with thiserror
  • Async First: Full async/await support with Tokio

📦 What's Included?

  • ✅ Connection pool management
  • ✅ Publisher with multiple exchange types
  • ✅ Subscriber with worker registry
  • ✅ Retry mechanism with DLQ
  • ✅ Single active consumer support
  • ✅ Prefetch and concurrency control
  • ✅ Middleware support
  • ✅ Distributed tracing
  • ✅ Dependency injection

Version

Version: 1.0.0-beta

This is the beta release of Easy RMQ 1.0.0.

Need Latest Updates?

For up-to-date documentation with the latest features, see the latest development version (1.0.0 unreleased) in the version dropdown in the top-right corner.

Stable Beta

This is a beta release. APIs are considered stable but may change before final release.

Get Started in 3 Steps

1. Start RabbitMQ

docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management

2. Add Dependency

[dependencies]
easy_rmq = "1.0"

3. Send & Receive Messages

Publisher:

use easy_rmq::AmqpClient;

let client = AmqpClient::new("amqp://guest:guest@localhost:5672".to_string(), 10)?;
client.publisher()
.publish_text("order.created", "Hello, AMQP!")
.await?;

Subscriber:

use easy_rmq::{AmqpClient, SubscriberRegistry, WorkerBuilder};
use lapin::ExchangeKind;

let client = AmqpClient::new("amqp://guest:guest@localhost:5672".to_string(), 10)?;
let pool = client.channel_pool();

let worker = SubscriberRegistry::new()
.register({
let pool = pool.clone();
move |_count| {
WorkerBuilder::new(ExchangeKind::Direct)
.pool(pool)
.with_exchange("order.events.v1")
.queue("order.process")
.build(handle_order_event)
}
});

worker.run().await?;

fn handle_order_event(data: Vec<u8>) -> easy_rmq::Result<()> {
let msg = String::from_utf8_lossy(&data);
println!("📦 Order: {}", msg);
Ok(())
}

Key Features

Connection Pool

Efficiently manages AMQP connections using deadpool for optimal resource utilization.

Auto Setup

Automatically creates exchanges, queues, and bindings - no manual infrastructure needed.

Worker Registry

Register and manage multiple workers with a clean, consistent pattern.

Retry Mechanism

Automatic retry with configurable delays and dead letter queue for failed messages.

Single Active Consumer

Ensure only one consumer processes messages at a time for strict ordering requirements.

Concurrency Control

Configurable worker concurrency with async/blocking spawn options.

Middleware

Custom middleware for logging, metrics, distributed tracing, and more.

Distributed Tracing

Built-in trace ID generation with OpenTelemetry support for tracking message flows.

Type Safety

Strong error handling with thiserror for compile-time guarantees.

Requirements

  • Rust: 1.70 or higher
  • RabbitMQ: 3.x (or Docker)
  • Tokio: For async runtime

What's Next

License

ISC