Publisher Guide
The publisher provides a simple and powerful way to send messages to RabbitMQ exchanges with automatic setup and various configuration options.
Basic Publisher
Create a publisher and send messages:
use easy_rmq_rs::AmqpClient;
let client = AmqpClient::new(
"amqp://guest:guest@localhost:5672".to_string(),
"my-app".to_string(),
10
)?;
let publisher = client.publisher();
// Publish text
publisher.publish("order.created", "Hello, AMQP!").await?;
// Publish JSON
#[derive(serde::Serialize)]
struct Order {
id: String,
total: f64,
}
let order = Order {
id: "123".to_string(),
total: 100.0,
};
let json = serde_json::to_vec(&order)?;
publisher.publish("order.created", json).await?;
The publish() method accepts multiple data types thanks to impl AsRef<[u8]>:
&str- string slices&[u8]- byte slices&String- owned strings&Vec<u8>- byte vectors
// All of these work:
publisher.publish("key", "hello").await?; // &str
publisher.publish("key", b"bytes").await?; // &[u8]
publisher.publish("key", &String::from("x")).await?; // &String
publisher.publish("key", &vec![1, 2, 3]).await?; // &Vec<u8>
Features
✅ Auto send to default exchange (amq.direct)
✅ Auto-create exchange if not exists (durable)
✅ No manual setup needed
Multiple Exchanges
You can create publishers for different exchanges:
let client = AmqpClient::new(
"amqp://guest:guest@localhost:5672".to_string(),
"my-app".to_string(),
10
)?;
// Publisher 1 - Custom exchange
let pub1 = client.publisher().with_exchange("orders");
pub1.publish("order.created", "Order data").await?;
// Publisher 2 - Another exchange
let pub2 = client.publisher().with_exchange("logs");
pub2.publish("order.api", "Log data").await?;
// Publisher 3 - Fanout exchange
let pub3 = client.publisher().with_exchange("broadcast");
pub3.publish("", "Broadcast data").await?;
✅ Simple - just pass exchange name ✅ Flexible - use any exchange type ✅ Auto-create exchange if not exists
Exchange Types
Direct Exchange
Messages are routed to queues based on exact routing key match:
let publisher = client.publisher()
.with_exchange("orders");
publisher.publish("order.created", "Order data").await?;
Topic Exchange
Messages are routed based on pattern matching:
let publisher = client.publisher()
.with_exchange("logs");
publisher.publish("order.created", "Order created log").await?;
publisher.publish("order.updated", "Order updated log").await?;
publisher.publish("error.critical", "Critical error log").await?;
Fanout Exchange
Messages are broadcast to all bound queues:
let publisher = client.publisher()
.with_exchange("events");
publisher.publish("", "Event data").await?;
Distributed Tracing
Add trace IDs to track messages across services:
// Auto-generate trace ID
client.publisher()
.with_auto_trace_id()
.publish("order.created", "Order data")
.await?;
// Use custom trace ID (e.g., from OpenTelemetry)
client.publisher()
.with_trace_id("trace-from-otel-123".to_string())
.publish("order.created", "Order data")
.await?;
// Generate standalone trace ID
use easy_rmq_rs::generate_trace_id;
let trace_id = generate_trace_id();
client.publisher()
.with_trace_id(trace_id)
.publish("order.created", "Order data")
.await?;
Error Handling
Publishers return Result<()> for proper error handling:
match publisher.publish("order.created", "Order data").await {
Ok(_) => println!("Message published successfully"),
Err(e) => eprintln!("Failed to publish message: {:?}", e),
}
Best Practices
- Use appropriate exchange types: Choose Direct, Topic, or Fanout based on your routing needs
- Implement error handling: Always handle publish errors appropriately
- Add trace IDs: Use distributed tracing for better debugging and monitoring
- Use the simplified publish() API: The
publish()method accepts all data types viaimpl AsRef<[u8]> - Connection pooling: The publisher uses the connection pool efficiently, no need to manage connections manually