package service import ( "context" "dsbqj-admin/app/task" "dsbqj-admin/model/mongo/subscribe" "dsbqj-admin/pkg/util" "errors" "fmt" "github.com/goccy/go-json" "github.com/kamva/mgm/v3" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type SubscribeOpenService struct { DeviceId string `json:"device_id"` OpenId string `json:"open_id"` Module string `json:"module"` } func (s *SubscribeOpenService) createSubscribe(deviceId, openId, module string) error { sub := &subscribe.Subscribe{ OpenId: openId, DeviceId: deviceId, Modules: make(map[string]*subscribe.ModuleSubscribe), } sub.Modules[module] = &subscribe.ModuleSubscribe{ Enabled: true, OpenAt: util.GetNowSecond(), } return mgm.Coll(sub).Create(sub) } func (this *SubscribeOpenService) updateSubscribe(deviceId string, module string) error { now := util.GetNowSecond() filter := bson.M{"device_id": deviceId} update := bson.M{ "$set": bson.M{ "device_id": deviceId, "modules." + module: bson.M{ "enabled": true, "open_at": now, }, "updated_at": now, }, } opts := options.Update().SetUpsert(true) _, err := mgm.Coll(&subscribe.Subscribe{}). UpdateOne(context.Background(), filter, update, opts) return err } func (this *SubscribeOpenService) Open() error { subscribeOne := new(subscribe.Subscribe) err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": this.DeviceId}, subscribeOne) if err != nil { // 1️⃣ 没有记录 if err == mongo.ErrNoDocuments { // 走创建逻辑 return this.createSubscribe(this.DeviceId, this.OpenId, this.Module) } // 2️⃣ 其他错误 return err } return this.updateSubscribe(this.DeviceId, this.Module) } type SubscribeCloseService struct { DeviceId string `json:"device_id"` Module string `json:"module"` } func (this *SubscribeCloseService) Close() error { now := util.GetNowSecond() filter := bson.M{ "device_id": this.DeviceId, "modules." + this.Module + ".enabled": true, } update := bson.M{ "$set": bson.M{ "modules." + this.Module + ".enabled": false, "modules." + this.Module + ".close_at": now, "updated_at": now, }, } _, err := mgm.Coll(&subscribe.Subscribe{}). UpdateOne(context.Background(), filter, update) return err } type SubscribeCheckService struct { DeviceId string `json:"device_id"` } func (this *SubscribeCheckService) Check() error { return nil } type SubscribeSendService struct { DeviceId string `json:"device_id"` Module string `json:"module"` Data string `json:"data"` } func (this *SubscribeSendService) Send() error { var list = make([]*subscribe.Subscribe, 0) // 更具deviceId 和 module 获取 需要推送的用户openid 列表 // data 应该是个json对象 var deviceIds []string json.Unmarshal([]byte(this.Data), &deviceIds) filter := bson.M{ "device_id": bson.M{"$in": deviceIds}, "modules." + this.Module + ".enabled": true, } err := mgm.Coll(&subscribe.Subscribe{}). SimpleFind(&list, filter) // 遍历所有list 获取openid列表后 循环发送 if err != nil { return err } var openIds []string for _, v := range list { openIds = append(openIds, v.OpenId) } task.SubscribeTask.Send(openIds, this.Module) return err } type SubscribeTaskService struct { DeviceId string `json:"device_id"` Module string `json:"module"` Timestamp int64 `json:"timestamp"` } func (this *SubscribeTaskService) Task() error { var key string switch this.Module { case "hangup": key = fmt.Sprintf("%s_hangup", this.DeviceId) case "autofight": key = fmt.Sprintf("%s_autofight", this.DeviceId) default: return errors.New("unknown module") } this.Timestamp = util.GetNowSecond() + 5*1000 task.SubscribeTask.Append(key, this.DeviceId, this.Timestamp, this.Module) return nil }