subscribe.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package service
  2. import (
  3. "context"
  4. "dsbqj-admin/app/task"
  5. "dsbqj-admin/model/mongo/subscribe"
  6. "dsbqj-admin/pkg/util"
  7. "errors"
  8. "fmt"
  9. "github.com/goccy/go-json"
  10. "github.com/kamva/mgm/v3"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "go.mongodb.org/mongo-driver/mongo"
  13. "go.mongodb.org/mongo-driver/mongo/options"
  14. )
  15. type SubscribeOpenService struct {
  16. DeviceId string `json:"device_id"`
  17. OpenId string `json:"open_id"`
  18. Module string `json:"module"`
  19. }
  20. func (s *SubscribeOpenService) createSubscribe(deviceId, openId, module string) error {
  21. sub := &subscribe.Subscribe{
  22. OpenId: openId,
  23. DeviceId: deviceId,
  24. Modules: make(map[string]*subscribe.ModuleSubscribe),
  25. }
  26. sub.Modules[module] = &subscribe.ModuleSubscribe{
  27. Enabled: true,
  28. OpenAt: util.GetNowSecond(),
  29. }
  30. return mgm.Coll(sub).Create(sub)
  31. }
  32. func (this *SubscribeOpenService) updateSubscribe(deviceId string, module string) error {
  33. now := util.GetNowSecond()
  34. filter := bson.M{"device_id": deviceId}
  35. update := bson.M{
  36. "$set": bson.M{
  37. "device_id": deviceId,
  38. "modules." + module: bson.M{
  39. "enabled": true,
  40. "open_at": now,
  41. },
  42. "updated_at": now,
  43. },
  44. }
  45. opts := options.Update().SetUpsert(true)
  46. _, err := mgm.Coll(&subscribe.Subscribe{}).
  47. UpdateOne(context.Background(), filter, update, opts)
  48. return err
  49. }
  50. func (this *SubscribeOpenService) Open() error {
  51. subscribeOne := new(subscribe.Subscribe)
  52. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": this.DeviceId}, subscribeOne)
  53. if err != nil {
  54. // 1️⃣ 没有记录
  55. if err == mongo.ErrNoDocuments {
  56. // 走创建逻辑
  57. return this.createSubscribe(this.DeviceId, this.OpenId, this.Module)
  58. }
  59. // 2️⃣ 其他错误
  60. return err
  61. }
  62. return this.updateSubscribe(this.DeviceId, this.Module)
  63. }
  64. type SubscribeCloseService struct {
  65. DeviceId string `json:"device_id"`
  66. Module string `json:"module"`
  67. }
  68. func (this *SubscribeCloseService) Close() error {
  69. now := util.GetNowSecond()
  70. filter := bson.M{
  71. "device_id": this.DeviceId,
  72. "modules." + this.Module + ".enabled": true,
  73. }
  74. update := bson.M{
  75. "$set": bson.M{
  76. "modules." + this.Module + ".enabled": false,
  77. "modules." + this.Module + ".close_at": now,
  78. "updated_at": now,
  79. },
  80. }
  81. _, err := mgm.Coll(&subscribe.Subscribe{}).
  82. UpdateOne(context.Background(), filter, update)
  83. return err
  84. }
  85. type SubscribeCheckService struct {
  86. DeviceId string `json:"device_id"`
  87. }
  88. func (this *SubscribeCheckService) Check() error {
  89. return nil
  90. }
  91. type SubscribeSendService struct {
  92. Module string `json:"module"`
  93. Data string `json:"data"`
  94. }
  95. func (this *SubscribeSendService) Send() error {
  96. var list = make([]*subscribe.Subscribe, 0)
  97. // 更具deviceId 和 module 获取 需要推送的用户openid 列表
  98. // data 应该是个json对象
  99. filter := bson.M{
  100. "modules." + this.Module + ".enabled": true,
  101. }
  102. if this.Module != "alliance" {
  103. var deviceIds []string
  104. json.Unmarshal([]byte(this.Data), &deviceIds)
  105. filter["device_id"] = bson.M{"$in": deviceIds}
  106. }
  107. err := mgm.Coll(&subscribe.Subscribe{}).
  108. SimpleFind(&list, filter)
  109. // 遍历所有list 获取openid列表后 循环发送
  110. if err != nil {
  111. return err
  112. }
  113. var openIds []string
  114. for _, v := range list {
  115. openIds = append(openIds, v.OpenId)
  116. }
  117. task.SubscribeTask.Send(openIds, this.Module)
  118. return err
  119. }
  120. type SubscribeTaskService struct {
  121. DeviceId string `json:"device_id"`
  122. Module string `json:"module"`
  123. Timestamp int64 `json:"timestamp"`
  124. Status bool `json:"status"`
  125. }
  126. func (this *SubscribeTaskService) Task() error {
  127. var key string
  128. switch this.Module {
  129. case "hangup":
  130. key = fmt.Sprintf("%s_hangup", this.DeviceId)
  131. case "autofight":
  132. key = fmt.Sprintf("%s_autofight", this.DeviceId)
  133. default:
  134. return errors.New("unknown module")
  135. }
  136. this.Timestamp = util.GetNowSecond() + 5*1000
  137. if this.Status {
  138. task.SubscribeTask.Append(key, this.DeviceId, this.Timestamp, this.Module)
  139. } else {
  140. task.SubscribeTask.Remove(key)
  141. }
  142. return nil
  143. }