subscribe.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package service
  2. import (
  3. "context"
  4. "dsbqj-admin/app/task"
  5. "dsbqj-admin/model/mongo/subscribe"
  6. "dsbqj-admin/pkg/util"
  7. "errors"
  8. "fmt"
  9. "github.com/goccy/go-json"
  10. "github.com/kamva/mgm/v3"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "go.mongodb.org/mongo-driver/mongo"
  13. "go.mongodb.org/mongo-driver/mongo/options"
  14. )
  15. type SubscribeOpenService struct {
  16. DeviceId string `json:"device_id"`
  17. OpenId string `json:"open_id"`
  18. Module string `json:"module"`
  19. }
  20. func (s *SubscribeOpenService) createSubscribe(deviceId, openId, module string) error {
  21. sub := &subscribe.Subscribe{
  22. OpenId: openId,
  23. DeviceId: deviceId,
  24. Modules: make(map[string]*subscribe.ModuleSubscribe),
  25. }
  26. sub.Modules[module] = &subscribe.ModuleSubscribe{
  27. Enabled: true,
  28. OpenAt: util.GetNowSecond(),
  29. }
  30. return mgm.Coll(sub).Create(sub)
  31. }
  32. func (this *SubscribeOpenService) updateSubscribe(deviceId string, module string) error {
  33. now := util.GetNowSecond()
  34. filter := bson.M{"device_id": deviceId}
  35. update := bson.M{
  36. "$set": bson.M{
  37. "device_id": deviceId,
  38. "modules." + module: bson.M{
  39. "enabled": true,
  40. "open_at": now,
  41. },
  42. "updated_at": now,
  43. },
  44. }
  45. opts := options.Update().SetUpsert(true)
  46. _, err := mgm.Coll(&subscribe.Subscribe{}).
  47. UpdateOne(context.Background(), filter, update, opts)
  48. return err
  49. }
  50. func (this *SubscribeOpenService) Open() error {
  51. subscribeOne := new(subscribe.Subscribe)
  52. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": this.DeviceId}, subscribeOne)
  53. if err != nil {
  54. // 1️⃣ 没有记录
  55. if err == mongo.ErrNoDocuments {
  56. // 走创建逻辑
  57. return this.createSubscribe(this.DeviceId, this.OpenId, this.Module)
  58. }
  59. // 2️⃣ 其他错误
  60. return err
  61. }
  62. return this.updateSubscribe(this.DeviceId, this.Module)
  63. }
  64. type SubscribeCloseService struct {
  65. DeviceId string `json:"device_id"`
  66. Module string `json:"module"`
  67. }
  68. func (this *SubscribeCloseService) Close() error {
  69. now := util.GetNowSecond()
  70. filter := bson.M{
  71. "device_id": this.DeviceId,
  72. "modules." + this.Module + ".enabled": true,
  73. }
  74. update := bson.M{
  75. "$set": bson.M{
  76. "modules." + this.Module + ".enabled": false,
  77. "modules." + this.Module + ".close_at": now,
  78. "updated_at": now,
  79. },
  80. }
  81. _, err := mgm.Coll(&subscribe.Subscribe{}).
  82. UpdateOne(context.Background(), filter, update)
  83. return err
  84. }
  85. type SubscribeCheckService struct {
  86. DeviceId string `json:"device_id"`
  87. }
  88. func (this *SubscribeCheckService) Check() error {
  89. return nil
  90. }
  91. type SubscribeSendService struct {
  92. DeviceId string `json:"device_id"`
  93. Module string `json:"module"`
  94. Data string `json:"data"`
  95. }
  96. func (this *SubscribeSendService) Send() error {
  97. var list = make([]*subscribe.Subscribe, 0)
  98. // 更具deviceId 和 module 获取 需要推送的用户openid 列表
  99. // data 应该是个json对象
  100. var deviceIds []string
  101. json.Unmarshal([]byte(this.Data), &deviceIds)
  102. filter := bson.M{
  103. "device_id": bson.M{"$in": deviceIds},
  104. "modules." + this.Module + ".enabled": true,
  105. }
  106. err := mgm.Coll(&subscribe.Subscribe{}).
  107. SimpleFind(&list, filter)
  108. // 遍历所有list 获取openid列表后 循环发送
  109. if err != nil {
  110. return err
  111. }
  112. var openIds []string
  113. for _, v := range list {
  114. openIds = append(openIds, v.OpenId)
  115. }
  116. task.SubscribeTask.Send(openIds, this.Module)
  117. return err
  118. }
  119. type SubscribeTaskService struct {
  120. DeviceId string `json:"device_id"`
  121. Module string `json:"module"`
  122. Timestamp int64 `json:"timestamp"`
  123. }
  124. func (this *SubscribeTaskService) Task() error {
  125. var key string
  126. switch this.Module {
  127. case "hangup":
  128. key = fmt.Sprintf("%s_hangup", this.DeviceId)
  129. case "autofight":
  130. key = fmt.Sprintf("%s_autofight", this.DeviceId)
  131. default:
  132. return errors.New("unknown module")
  133. }
  134. this.Timestamp = util.GetNowSecond() + 5*1000
  135. task.SubscribeTask.Append(key, this.DeviceId, this.Timestamp, this.Module)
  136. return nil
  137. }