Skip to main content
Version: 1.0.1

Actix-Web Integration

Learn how to integrate easy-rmq-rs with Actix-Web 4.x for building high-performance message publishing APIs.

Overviewโ€‹

This guide shows you how to:

  • โœ… Set up easy-rmq-rs with Actix-Web
  • โœ… Use non-blocking JSON serialization with spawn_blocking
  • โœ… Pre-configure publishers with specific exchanges
  • โœ… Handle errors properly in web handlers
  • โœ… Achieve 3-10x performance improvement with proper async handling

Prerequisitesโ€‹

Add the following dependencies to your Cargo.toml:

[dependencies]
easy-rmq-rs = "1.0"
actix-web = "4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

Quick Startโ€‹

1. Create RabbitMQ Clientโ€‹

use easy_rmq_rs::AmqpClient;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Initialize RabbitMQ client with connection name
let client = AmqpClient::new(
"amqp://admin:password@localhost:5672".to_string(),
"actix-web".to_string(), // connection name (visible in RabbitMQ Manager)
10, // max pool size
)
.expect("Failed to create RabbitMQ client");

// ... rest of the code
}

2. Create Pre-configured Publisherโ€‹

use easy_rmq_rs::Publisher;
use std::sync::Arc;

// Create publisher pre-configured with exchange
let publisher = Arc::new(
client.publisher()
.with_exchange("order.events.v1")
);

3. Set Up Actix-Web Stateโ€‹

use actix_web::{web, App, HttpServer};

struct AppState {
publisher: Arc<Publisher>,
}

let app_state = web::Data::new(AppState { publisher });

HttpServer::new(move || {
App::new()
.app_data(app_state.clone())
.route("/publish", web::post().to(publish_order))
})
.bind("127.0.0.1:8080")?
.run()
.await

4. Create Publish Handlerโ€‹

use actix_web::{web, HttpResponse, Responder};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: String,
customer: String,
total: f64,
items: Vec<OrderItem>,
}

#[derive(Debug, Serialize, Deserialize)]
struct OrderItem {
product_id: String,
quantity: u32,
price: f64,
}

#[derive(Debug, Serialize, Deserialize)]
struct PublishResponse {
success: bool,
message: String,
}

async fn publish_order(
state: web::Data<AppState>,
order: web::Json<Order>,
) -> impl Responder {
let order_id = order.id.clone();
let order_data = order.into_inner();

// Non-blocking JSON serialization
let order_json = match tokio::task::spawn_blocking(move || {
serde_json::to_vec(&order_data)
.map_err(|e| format!("JSON serialization failed: {}", e))
})
.await
{
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => {
return HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: e,
})
}
Err(e) => {
return HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: format!("Task join error: {}", e),
})
}
};

// Publish using pre-configured publisher
match state.publisher.publish("order.process", order_json).await {
Ok(_) => HttpResponse::Ok().json(PublishResponse {
success: true,
message: format!("Order {} published successfully", order_id),
}),
Err(e) => HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: format!("Failed to publish order: {}", e),
}),
}
}

Complete Exampleโ€‹

Here's the complete example:

use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use easy_rmq_rs::{AmqpClient, Publisher};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: String,
customer: String,
total: f64,
items: Vec<OrderItem>,
}

#[derive(Debug, Serialize, Deserialize)]
struct OrderItem {
product_id: String,
quantity: u32,
price: f64,
}

#[derive(Debug, Serialize, Deserialize)]
struct PublishResponse {
success: bool,
message: String,
}

struct AppState {
publisher: Arc<Publisher>,
}

async fn publish_order(
state: web::Data<AppState>,
order: web::Json<Order>,
) -> impl Responder {
let order_id = order.id.clone();
let order_data = order.into_inner();

// Non-blocking JSON serialization
let order_json = match tokio::task::spawn_blocking(move || {
serde_json::to_vec(&order_data)
.map_err(|e| format!("JSON serialization failed: {}", e))
})
.await
{
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => {
return HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: e,
})
}
Err(e) => {
return HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: format!("Task join error: {}", e),
})
}
};

// Publish using pre-configured publisher
match state.publisher.publish("order.process", order_json).await {
Ok(_) => HttpResponse::Ok().json(PublishResponse {
success: true,
message: format!("Order {} published successfully", order_id),
}),
Err(e) => HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: format!("Failed to publish order: {}", e),
}),
}
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

// Initialize RabbitMQ client
let client = AmqpClient::new(
"amqp://admin:password@localhost:5672".to_string(),
"actix-web".to_string(),
10,
)
.expect("Failed to create RabbitMQ client");

// Create publisher pre-configured with exchange
let publisher = Arc::new(
client.publisher()
.with_exchange("order.events.v1")
);

let app_state = web::Data::new(AppState { publisher });

let bind_address = "127.0.0.1:8080";
println!("๐Ÿš€ Actix-Web + easy-rmq-rs Integration");
println!("=======================================\n");
println!("๐Ÿ“ Server: http://{}\n", bind_address);
println!("๐Ÿ“ฐ Endpoint:");
println!(" POST /publish - Publish order to order.events.v1 exchange\n");
println!("๐Ÿ“ Configuration:");
println!(" Exchange: order.events.v1 (Direct)");
println!(" Routing Key: order.process");
println!(" Publisher: Pre-configured with exchange\n");
println!("โœ… Non-blocking JSON serialization with spawn_blocking\n");

HttpServer::new(move || {
App::new()
.app_data(app_state.clone())
.route("/publish", web::post().to(publish_order))
})
.bind(bind_address)?
.run()
.await
}

Why spawn_blocking is Importantโ€‹

Without spawn_blockingโ€‹

// โŒ BAD - Blocks Actix thread during serialization
let json = serde_json::to_vec(&order)?;
publisher.publish("order.process", json).await?;

Problems:

  • Actix worker thread blocks during serialization (50-500ยตs for 1-10 KB payload)
  • Thread cannot handle other requests
  • Throughput drops drastically under high load

With spawn_blockingโ€‹

// โœ… GOOD - Serialization in separate thread pool
let json = tokio::task::spawn_blocking(move || {
serde_json::to_vec(&order)
}).await?;

publisher.publish("order.process", json).await?;

Benefits:

  • Serialization in separate blocking thread pool
  • Actix threads stay responsive
  • 3-10x improvement for large payloads

Performance Benchmarksโ€‹

Payload SizeSerialization TimeWithout spawn_blockingWith spawn_blockingImprovement
1 KB~27 ยตs~2,000 req/sec~5,500 req/sec2.75x
10 KB~500 ยตs~200 req/sec~1,800 req/sec9x

Testing the APIโ€‹

Start the Serverโ€‹

cargo run --example actix_integration

Test Endpointโ€‹

curl -X POST http://localhost:8080/publish \
-H "Content-Type: application/json" \
-d '{
"id": "ORD-001",
"customer": "John Doe",
"total": 150.50,
"items": [
{
"product_id": "PROD-001",
"quantity": 2,
"price": 75.25
}
]
}'

Success Responseโ€‹

{
"success": true,
"message": "Order ORD-001 published successfully"
}

Error Responseโ€‹

{
"success": false,
"message": "Failed to publish order: ..."
}

Load Testingโ€‹

# Create order.json file
cat > order.json << 'EOF'
{
"id": "ORD-001",
"customer": "Test Customer",
"total": 100.0,
"items": [
{
"product_id": "PROD-001",
"quantity": 1,
"price": 100.0
}
]
}
EOF

# Run load test with Apache Bench
ab -n 1000 -c 10 -T "application/json" \
-p order.json \
http://localhost:8080/publish

Best Practicesโ€‹

1. Use Pre-configured Publishersโ€‹

// โœ… GOOD - Pre-configure exchange
let publisher = Arc::new(
client.publisher()
.with_exchange("order.events.v1")
);

// Later, just publish with routing key
publisher.publish("order.process", data).await?;

2. Always Use spawn_blocking for Serializationโ€‹

// โœ… GOOD - Non-blocking serialization
let json = tokio::task::spawn_blocking(move || {
serde_json::to_vec(&data)
}).await??;

publisher.publish("routing.key", json).await?;

3. Handle Errors Properlyโ€‹

// โœ… GOOD - Comprehensive error handling
let json = match tokio::task::spawn_blocking(move || {
serde_json::to_vec(&data)
.map_err(|e| format!("Serialization failed: {}", e))
})
.await
{
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => return Err(e.into()),
Err(e) => return Err(format!("Task join error: {}", e).into()),
};

publisher.publish("routing.key", json).await?;

4. Use Connection Namesโ€‹

// โœ… GOOD - Use connection name for better tracking
let client = AmqpClient::new(
url,
"actix-web".to_string(), // visible in RabbitMQ Manager
pool_size
)?;

Advanced: Multiple Publishersโ€‹

Multiple Exchangesโ€‹

struct AppState {
order_publisher: Arc<Publisher>,
log_publisher: Arc<Publisher>,
}

let order_publisher = Arc::new(
client.publisher()
.with_exchange("order.events.v1")
);

let log_publisher = Arc::new(
client.publisher()
.with_exchange("logs.v1")
);

let app_state = web::Data::new(AppState {
order_publisher,
log_publisher,
});

Use in Handlersโ€‹

async fn publish_order(
state: web::Data<AppState>,
order: web::Json<Order>,
) -> impl Responder {
// Publish to order exchange
state.order_publisher.publish("order.created", json).await?;

// Publish to log exchange
state.log_publisher.publish("order.api", log_json).await?;

Ok(HttpResponse::Ok())
}

Monitoringโ€‹

RabbitMQ Management UIโ€‹

Check Connection Nameโ€‹

In the Management UI, you'll see connections named "actix-web" making it easy to identify which connections belong to your web service.

Metrics to Monitorโ€‹

  • Message rate: Messages published per second
  • Connection count: Number of active connections
  • Channel count: Number of active channels
  • Queue depth: Messages in queues

Troubleshootingโ€‹

Connection Refusedโ€‹

# Check if RabbitMQ is running
nc -z localhost 5672
echo $?

# Start RabbitMQ
brew services start rabbitmq
# or
docker start rabbitmq

Port Already in Useโ€‹

# Check which process is using port 8080
lsof -i :8080

# Kill the process
kill -9 <PID>

Serialization Errorsโ€‹

Ensure your JSON payload is valid:

echo '{"id":"123"}' | jq .

Performance Issuesโ€‹

  1. Enable spawn_blocking - Make sure JSON serialization is non-blocking
  2. Increase pool size - Adjust based on your concurrency needs
  3. Monitor RabbitMQ - Check for bottlenecks in the message broker
  4. Add logging - Use env_logger to debug issues

What's Nextโ€‹