消息队列集成

XYGo Admin 2026-04-22 1 次阅读

Queue消费者注册、Topic常量、生产消息

如果你的扩展需要异步处理任务(如发送邮件、生成报表),可以使用消息队列。

扩展与主包的关系:扩展的消息队列使用的是同一套基础设施 xygo/internal/library/queue,注册方式、Consumer 接口、投递 API 完全相同。唯一区别是代码存放位置:

对比项 主包 扩展
消费者位置 server/internal/queues/*.go server/addons/{name}/queues/*.go
包名 package queues package queues
触发方式 main.go 空导入 _ "xygo/internal/queues" module.go 空导入 _ "xygo/addons/{name}/queues"
Topic 命名 login_logoperation_log 加扩展前缀:shop.order.payment
常量定义 每个文件内 const TopicXxx 同样,每个文件内 const TopicXxx

原则:扩展不修改主包的任何消费者,只通过同一套 API 注册自己的消费者。所有消费者在同一个进程内运行,共享同一个队列驱动。

定义消费者

queues/ 子目录下创建消费者文件:

go 复制代码
// server/addons/shop/queues/order_payment.go
package queues

import (
    "context"
    "encoding/json"

    "github.com/gogf/gf/v2/frame/g"
    "xygo/internal/library/queue"
)

// Topic 常量(供生产者引用)
const TopicOrderPayment = "shop.order.payment"

func init() {
    queue.Register(&OrderPaymentConsumer{})
}

type OrderPaymentConsumer struct{}

func (c *OrderPaymentConsumer) GetTopic() string {
    return TopicOrderPayment
}

func (c *OrderPaymentConsumer) Handle(ctx context.Context, msg *queue.Message) error {
    var data map[string]interface{}
    if err := json.Unmarshal([]byte(msg.Body), &data); err != nil {
        g.Log().Errorf(ctx, "[queue:%s] unmarshal failed: %v", TopicOrderPayment, err)
        return nil
    }
    // 处理业务逻辑...
    return nil
}

生产消息

go 复制代码
import (
    "xygo/internal/library/queue"
    shopQueues "xygo/addons/shop/queues"
)

// 投递即时消息
err := queue.Push(shopQueues.TopicOrderPayment, []byte(`{"orderId": 12345}`))

// 投递延迟消息(如果驱动支持)
err := queue.PushDelay("shop.order.timeout", []byte(`{"orderId": 12345}`), 30*60)

Consumer 接口

go 复制代码
type Consumer interface {
    GetTopic() string
    Handle(ctx context.Context, msg *Message) error
}

Topic 命名规范

复制代码
shop.order.payment     ✓ 正确(扩展名.模块.动作)
shop.stock.sync        ✓ 正确
order.payment          ✗ 避免(缺少扩展名前缀)