Subscriber Guide (Go)
Subscribers receive messages from queues using a clean and powerful worker registry pattern with automatic setup.
Basic Subscriber
Create a simple subscriber with a handler:
package main
import (
easyrmq "github.com/skyapps-id/easy-rmq-go/pkg/easyrmq"
)
func main() {
client, _ := easyrmq.NewClient("amqp://guest:guest@localhost:5672", 10)
defer client.Close()
worker := easyrmq.NewSubscriberRegistry().
Register(func(count int) easyrmq.Worker {
return easyrmq.NewWorkerBuilder(easyrmq.ExchangeKindDirect).
WithPool(client).
WithExchange("order.events.v1").
Queue("order.process").
Build(handleOrderEvent)
})
worker.Run()
}
func handleOrderEvent(data []byte) error {
msg := string(data)
println("📦 Order:", msg)
return nil
}
Worker Registry
Register multiple workers with different configurations:
worker := easyrmq.NewSubscriberRegistry().
Register(func(count int) easyrmq.Worker {
println("📝 Registering worker #", count)
return easyrmq.NewWorkerBuilder(easyrmq.ExchangeKindDirect).
WithPool(client).
WithExchange("order.events.v1").
Queue("order.process").
Build(handleOrderEvent)
}).
Register(func(count int) easyrmq.Worker {
println("📝 Registering worker #", count)
return easyrmq.NewWorkerBuilder(easyrmq.ExchangeKindTopic).
WithPool(client).
WithExchange("logs.v1").
RoutingKey("order.*").
Queue("api_logs").
Build(handleLogEvent)
})
worker.Run()
Queue Format per Exchange Type
Direct Exchange
easyrmq.NewWorkerBuilder(easyrmq.ExchangeKindDirect).
WithPool(client).
WithExchange("order.events").
Queue("order.created"). // routing_key
Build(handler)
// Queue: "order.created.job"
// Binding: queue_bind("order.created.job", "order.events", "order.created")
Topic Exchange
easyrmq.NewWorkerBuilder(easyrmq.ExchangeKindTopic).
WithPool(client).
WithExchange("logs").
RoutingKey("order.*"). // routing pattern
Queue("api_logs"). // queue name
Build(handler)
// Queue: "api_logs"
// Binding: queue_bind("api_logs", "logs", "order.*")
Fanout Exchange
easyrmq.NewWorkerBuilder(easyrmq.ExchangeKindFanout).
WithPool(client).
WithExchange("events").
Queue("notification_q").
Build(handler)
// Queue: "notification_q"
// Binding: queue_bind("notification_q", "events", "")
Message Handlers
Handler functions receive message data as []byte:
func handleTextMessage(data []byte) error {
msg := string(data)
println("Received:", msg)
return nil
}
func handleJSONMessage(data []byte) error {
type Order struct {
ID string `json:"id"`
Total float64 `json:"total"`
}
var order Order
if err := json.Unmarshal(data, &order); err != nil {
return err
}
println("Order ID:", order.ID, "Total:", order.Total)
return nil
}
Graceful Shutdown
The subscriber handles graceful shutdown automatically:
worker := easyrmq.NewSubscriberRegistry().
Register(/* ... */).
Run()
// Press Ctrl+C to gracefully shutdown
// All in-flight messages will be processed
Error Handling
Handlers can return errors for automatic retry:
func handleWithError(data []byte) error {
if someCondition {
return fmt.Errorf("processing failed")
}
// Process message
return nil
}
Best Practices
- Use Worker Registry: Manage multiple workers with consistent pattern
- Handler functions: Keep handlers simple and focused
- Error handling: Return errors for retry, handle unrecoverable errors
- Queue naming: Use descriptive queue names following the exchange type pattern
- Graceful shutdown: Ensure your handlers complete processing before shutdown