消息队列集成
XYGo Admin 2026-04-22 1 次阅读
Queue消费者注册、Topic常量、生产消息
如果你的扩展需要异步处理任务(如发送邮件、生成报表),可以使用消息队列。
扩展与主包的关系:扩展的消息队列使用的是同一套基础设施 xygo/internal/library/queue,注册方式、Consumer 接口、投递 API 完全相同。唯一区别是代码存放位置:
| 对比项 | 主包 | 扩展 |
|---|---|---|
| 消费者位置 | server/internal/queues/*.go |
server/addons/{name}/queues/*.go |
| 包名 | package queues |
package queues |
| 触发方式 | main.go 空导入 _ "xygo/internal/queues" |
module.go 空导入 _ "xygo/addons/{name}/queues" |
| Topic 命名 | login_log、operation_log |
加扩展前缀:shop.order.payment |
| 常量定义 | 每个文件内 const TopicXxx |
同样,每个文件内 const TopicXxx |
原则:扩展不修改主包的任何消费者,只通过同一套 API 注册自己的消费者。所有消费者在同一个进程内运行,共享同一个队列驱动。
定义消费者
在 queues/ 子目录下创建消费者文件:
go
// server/addons/shop/queues/order_payment.go
package queues
import (
"context"
"encoding/json"
"github.com/gogf/gf/v2/frame/g"
"xygo/internal/library/queue"
)
// Topic 常量(供生产者引用)
const TopicOrderPayment = "shop.order.payment"
func init() {
queue.Register(&OrderPaymentConsumer{})
}
type OrderPaymentConsumer struct{}
func (c *OrderPaymentConsumer) GetTopic() string {
return TopicOrderPayment
}
func (c *OrderPaymentConsumer) Handle(ctx context.Context, msg *queue.Message) error {
var data map[string]interface{}
if err := json.Unmarshal([]byte(msg.Body), &data); err != nil {
g.Log().Errorf(ctx, "[queue:%s] unmarshal failed: %v", TopicOrderPayment, err)
return nil
}
// 处理业务逻辑...
return nil
}
生产消息
go
import (
"xygo/internal/library/queue"
shopQueues "xygo/addons/shop/queues"
)
// 投递即时消息
err := queue.Push(shopQueues.TopicOrderPayment, []byte(`{"orderId": 12345}`))
// 投递延迟消息(如果驱动支持)
err := queue.PushDelay("shop.order.timeout", []byte(`{"orderId": 12345}`), 30*60)
Consumer 接口
go
type Consumer interface {
GetTopic() string
Handle(ctx context.Context, msg *Message) error
}
Topic 命名规范
shop.order.payment ✓ 正确(扩展名.模块.动作)
shop.stock.sync ✓ 正确
order.payment ✗ 避免(缺少扩展名前缀)