Skip to main content
Version: 1.0.2 🏗️

Publisher Guide

The publisher provides a simple and powerful way to send messages to RabbitMQ exchanges with automatic setup and various configuration options.

Basic Publisher

Create a publisher and send messages:

use easy_rmq_rs::AmqpClient;

let client = AmqpClient::new(
"amqp://guest:guest@localhost:5672".to_string(),
"my-app".to_string(),
10
)?;

let publisher = client.publisher();

// Publish text
publisher.publish("order.created", "Hello, AMQP!").await?;

// Publish JSON
#[derive(serde::Serialize)]
struct Order {
id: String,
total: f64,
}

let order = Order {
id: "123".to_string(),
total: 100.0,
};

let json = serde_json::to_vec(&order)?;
publisher.publish("order.created", json).await?;

The publish() method accepts multiple data types thanks to impl AsRef<[u8]>:

  • &str - string slices
  • &[u8] - byte slices
  • &String - owned strings
  • &Vec<u8> - byte vectors
// All of these work:
publisher.publish("key", "hello").await?; // &str
publisher.publish("key", b"bytes").await?; // &[u8]
publisher.publish("key", &String::from("x")).await?; // &String
publisher.publish("key", &vec![1, 2, 3]).await?; // &Vec<u8>

Features

Auto send to default exchange (amq.direct)
Auto-create exchange if not exists (durable)
No manual setup needed

Multiple Exchanges

You can create publishers for different exchanges:

let client = AmqpClient::new(
"amqp://guest:guest@localhost:5672".to_string(),
"my-app".to_string(),
10
)?;

// Publisher 1 - Custom exchange
let pub1 = client.publisher().with_exchange("orders");
pub1.publish("order.created", "Order data").await?;

// Publisher 2 - Another exchange
let pub2 = client.publisher().with_exchange("logs");
pub2.publish("order.api", "Log data").await?;

// Publisher 3 - Fanout exchange
let pub3 = client.publisher().with_exchange("broadcast");
pub3.publish("", "Broadcast data").await?;

Simple - just pass exchange name ✅ Flexible - use any exchange type ✅ Auto-create exchange if not exists

Exchange Types

Direct Exchange

Messages are routed to queues based on exact routing key match:

let publisher = client.publisher()
.with_exchange("orders");

publisher.publish("order.created", "Order data").await?;

Topic Exchange

Messages are routed based on pattern matching:

let publisher = client.publisher()
.with_exchange("logs");

publisher.publish("order.created", "Order created log").await?;
publisher.publish("order.updated", "Order updated log").await?;
publisher.publish("error.critical", "Critical error log").await?;

Fanout Exchange

Messages are broadcast to all bound queues:

let publisher = client.publisher()
.with_exchange("events");

publisher.publish("", "Event data").await?;

Distributed Tracing

Add trace IDs to track messages across services:

// Auto-generate trace ID
client.publisher()
.with_auto_trace_id()
.publish("order.created", "Order data")
.await?;

// Use custom trace ID (e.g., from OpenTelemetry)
client.publisher()
.with_trace_id("trace-from-otel-123".to_string())
.publish("order.created", "Order data")
.await?;

// Generate standalone trace ID
use easy_rmq_rs::generate_trace_id;
let trace_id = generate_trace_id();
client.publisher()
.with_trace_id(trace_id)
.publish("order.created", "Order data")
.await?;

Error Handling

Publishers return Result<()> for proper error handling:

match publisher.publish("order.created", "Order data").await {
Ok(_) => println!("Message published successfully"),
Err(e) => eprintln!("Failed to publish message: {:?}", e),
}

Best Practices

  1. Use appropriate exchange types: Choose Direct, Topic, or Fanout based on your routing needs
  2. Implement error handling: Always handle publish errors appropriately
  3. Add trace IDs: Use distributed tracing for better debugging and monitoring
  4. Use the simplified publish() API: The publish() method accepts all data types via impl AsRef<[u8]>
  5. Connection pooling: The publisher uses the connection pool efficiently, no need to manage connections manually