Actix-Web Integration
Learn how to integrate easy-rmq-rs with Actix-Web 4.x for building high-performance message publishing APIs.
Overviewโ
This guide shows you how to:
- โ Set up easy-rmq-rs with Actix-Web
- โ
Use non-blocking JSON serialization with
spawn_blocking - โ Pre-configure publishers with specific exchanges
- โ Handle errors properly in web handlers
- โ Achieve 3-10x performance improvement with proper async handling
Prerequisitesโ
Add the following dependencies to your Cargo.toml:
[dependencies]
easy-rmq-rs = "1.0"
actix-web = "4"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Quick Startโ
1. Create RabbitMQ Clientโ
use easy_rmq_rs::AmqpClient;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
// Initialize RabbitMQ client with connection name
let client = AmqpClient::new(
"amqp://admin:password@localhost:5672".to_string(),
"actix-web".to_string(), // connection name (visible in RabbitMQ Manager)
10, // max pool size
)
.expect("Failed to create RabbitMQ client");
// ... rest of the code
}
2. Create Pre-configured Publisherโ
use easy_rmq_rs::Publisher;
use std::sync::Arc;
// Create publisher pre-configured with exchange
let publisher = Arc::new(
client.publisher()
.with_exchange("order.events.v1")
);
3. Set Up Actix-Web Stateโ
use actix_web::{web, App, HttpServer};
struct AppState {
publisher: Arc<Publisher>,
}
let app_state = web::Data::new(AppState { publisher });
HttpServer::new(move || {
App::new()
.app_data(app_state.clone())
.route("/publish", web::post().to(publish_order))
})
.bind("127.0.0.1:8080")?
.run()
.await
4. Create Publish Handlerโ
use actix_web::{web, HttpResponse, Responder};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: String,
customer: String,
total: f64,
items: Vec<OrderItem>,
}
#[derive(Debug, Serialize, Deserialize)]
struct OrderItem {
product_id: String,
quantity: u32,
price: f64,
}
#[derive(Debug, Serialize, Deserialize)]
struct PublishResponse {
success: bool,
message: String,
}
async fn publish_order(
state: web::Data<AppState>,
order: web::Json<Order>,
) -> impl Responder {
let order_id = order.id.clone();
let order_data = order.into_inner();
// Non-blocking JSON serialization
let order_json = match tokio::task::spawn_blocking(move || {
serde_json::to_vec(&order_data)
.map_err(|e| format!("JSON serialization failed: {}", e))
})
.await
{
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => {
return HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: e,
})
}
Err(e) => {
return HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: format!("Task join error: {}", e),
})
}
};
// Publish using pre-configured publisher
match state.publisher.publish("order.process", order_json).await {
Ok(_) => HttpResponse::Ok().json(PublishResponse {
success: true,
message: format!("Order {} published successfully", order_id),
}),
Err(e) => HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: format!("Failed to publish order: {}", e),
}),
}
}
Complete Exampleโ
Here's the complete example:
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use easy_rmq_rs::{AmqpClient, Publisher};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Serialize, Deserialize)]
struct Order {
id: String,
customer: String,
total: f64,
items: Vec<OrderItem>,
}
#[derive(Debug, Serialize, Deserialize)]
struct OrderItem {
product_id: String,
quantity: u32,
price: f64,
}
#[derive(Debug, Serialize, Deserialize)]
struct PublishResponse {
success: bool,
message: String,
}
struct AppState {
publisher: Arc<Publisher>,
}
async fn publish_order(
state: web::Data<AppState>,
order: web::Json<Order>,
) -> impl Responder {
let order_id = order.id.clone();
let order_data = order.into_inner();
// Non-blocking JSON serialization
let order_json = match tokio::task::spawn_blocking(move || {
serde_json::to_vec(&order_data)
.map_err(|e| format!("JSON serialization failed: {}", e))
})
.await
{
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => {
return HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: e,
})
}
Err(e) => {
return HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: format!("Task join error: {}", e),
})
}
};
// Publish using pre-configured publisher
match state.publisher.publish("order.process", order_json).await {
Ok(_) => HttpResponse::Ok().json(PublishResponse {
success: true,
message: format!("Order {} published successfully", order_id),
}),
Err(e) => HttpResponse::InternalServerError().json(PublishResponse {
success: false,
message: format!("Failed to publish order: {}", e),
}),
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
// Initialize RabbitMQ client
let client = AmqpClient::new(
"amqp://admin:password@localhost:5672".to_string(),
"actix-web".to_string(),
10,
)
.expect("Failed to create RabbitMQ client");
// Create publisher pre-configured with exchange
let publisher = Arc::new(
client.publisher()
.with_exchange("order.events.v1")
);
let app_state = web::Data::new(AppState { publisher });
let bind_address = "127.0.0.1:8080";
println!("๐ Actix-Web + easy-rmq-rs Integration");
println!("=======================================\n");
println!("๐ Server: http://{}\n", bind_address);
println!("๐ฐ Endpoint:");
println!(" POST /publish - Publish order to order.events.v1 exchange\n");
println!("๐ Configuration:");
println!(" Exchange: order.events.v1 (Direct)");
println!(" Routing Key: order.process");
println!(" Publisher: Pre-configured with exchange\n");
println!("โ
Non-blocking JSON serialization with spawn_blocking\n");
HttpServer::new(move || {
App::new()
.app_data(app_state.clone())
.route("/publish", web::post().to(publish_order))
})
.bind(bind_address)?
.run()
.await
}
Why spawn_blocking is Importantโ
Without spawn_blockingโ
// โ BAD - Blocks Actix thread during serialization
let json = serde_json::to_vec(&order)?;
publisher.publish("order.process", json).await?;
Problems:
- Actix worker thread blocks during serialization (50-500ยตs for 1-10 KB payload)
- Thread cannot handle other requests
- Throughput drops drastically under high load
With spawn_blockingโ
// โ
GOOD - Serialization in separate thread pool
let json = tokio::task::spawn_blocking(move || {
serde_json::to_vec(&order)
}).await?;
publisher.publish("order.process", json).await?;
Benefits:
- Serialization in separate blocking thread pool
- Actix threads stay responsive
- 3-10x improvement for large payloads
Performance Benchmarksโ
| Payload Size | Serialization Time | Without spawn_blocking | With spawn_blocking | Improvement |
|---|---|---|---|---|
| 1 KB | ~27 ยตs | ~2,000 req/sec | ~5,500 req/sec | 2.75x |
| 10 KB | ~500 ยตs | ~200 req/sec | ~1,800 req/sec | 9x |
Testing the APIโ
Start the Serverโ
cargo run --example actix_integration
Test Endpointโ
curl -X POST http://localhost:8080/publish \
-H "Content-Type: application/json" \
-d '{
"id": "ORD-001",
"customer": "John Doe",
"total": 150.50,
"items": [
{
"product_id": "PROD-001",
"quantity": 2,
"price": 75.25
}
]
}'
Success Responseโ
{
"success": true,
"message": "Order ORD-001 published successfully"
}
Error Responseโ
{
"success": false,
"message": "Failed to publish order: ..."
}
Load Testingโ
# Create order.json file
cat > order.json << 'EOF'
{
"id": "ORD-001",
"customer": "Test Customer",
"total": 100.0,
"items": [
{
"product_id": "PROD-001",
"quantity": 1,
"price": 100.0
}
]
}
EOF
# Run load test with Apache Bench
ab -n 1000 -c 10 -T "application/json" \
-p order.json \
http://localhost:8080/publish
Best Practicesโ
1. Use Pre-configured Publishersโ
// โ
GOOD - Pre-configure exchange
let publisher = Arc::new(
client.publisher()
.with_exchange("order.events.v1")
);
// Later, just publish with routing key
publisher.publish("order.process", data).await?;
2. Always Use spawn_blocking for Serializationโ
// โ
GOOD - Non-blocking serialization
let json = tokio::task::spawn_blocking(move || {
serde_json::to_vec(&data)
}).await??;
publisher.publish("routing.key", json).await?;
3. Handle Errors Properlyโ
// โ
GOOD - Comprehensive error handling
let json = match tokio::task::spawn_blocking(move || {
serde_json::to_vec(&data)
.map_err(|e| format!("Serialization failed: {}", e))
})
.await
{
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => return Err(e.into()),
Err(e) => return Err(format!("Task join error: {}", e).into()),
};
publisher.publish("routing.key", json).await?;
4. Use Connection Namesโ
// โ
GOOD - Use connection name for better tracking
let client = AmqpClient::new(
url,
"actix-web".to_string(), // visible in RabbitMQ Manager
pool_size
)?;
Advanced: Multiple Publishersโ
Multiple Exchangesโ
struct AppState {
order_publisher: Arc<Publisher>,
log_publisher: Arc<Publisher>,
}
let order_publisher = Arc::new(
client.publisher()
.with_exchange("order.events.v1")
);
let log_publisher = Arc::new(
client.publisher()
.with_exchange("logs.v1")
);
let app_state = web::Data::new(AppState {
order_publisher,
log_publisher,
});
Use in Handlersโ
async fn publish_order(
state: web::Data<AppState>,
order: web::Json<Order>,
) -> impl Responder {
// Publish to order exchange
state.order_publisher.publish("order.created", json).await?;
// Publish to log exchange
state.log_publisher.publish("order.api", log_json).await?;
Ok(HttpResponse::Ok())
}
Monitoringโ
RabbitMQ Management UIโ
- URL: http://localhost:15672
- Username:
guest - Password:
guest
Check Connection Nameโ
In the Management UI, you'll see connections named "actix-web" making it easy to identify which connections belong to your web service.
Metrics to Monitorโ
- Message rate: Messages published per second
- Connection count: Number of active connections
- Channel count: Number of active channels
- Queue depth: Messages in queues
Troubleshootingโ
Connection Refusedโ
# Check if RabbitMQ is running
nc -z localhost 5672
echo $?
# Start RabbitMQ
brew services start rabbitmq
# or
docker start rabbitmq
Port Already in Useโ
# Check which process is using port 8080
lsof -i :8080
# Kill the process
kill -9 <PID>
Serialization Errorsโ
Ensure your JSON payload is valid:
echo '{"id":"123"}' | jq .
Performance Issuesโ
- Enable spawn_blocking - Make sure JSON serialization is non-blocking
- Increase pool size - Adjust based on your concurrency needs
- Monitor RabbitMQ - Check for bottlenecks in the message broker
- Add logging - Use
env_loggerto debug issues
What's Nextโ
- Publisher Guide - Learn more about publishers
- Dependency Injection - Use DI patterns
- Advanced Topics - Explore advanced features