subscribe.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package service
  2. import (
  3. "context"
  4. "dsbqj-admin/app/task"
  5. "dsbqj-admin/model/mongo/subscribe"
  6. "dsbqj-admin/pkg/logger"
  7. "dsbqj-admin/pkg/util"
  8. "errors"
  9. "fmt"
  10. "github.com/kamva/mgm/v3"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "go.mongodb.org/mongo-driver/mongo"
  13. "go.mongodb.org/mongo-driver/mongo/options"
  14. )
  15. type SubscribeOpenService struct {
  16. DeviceId string `json:"device_id"`
  17. OpenId string `json:"open_id"`
  18. Module string `json:"module"`
  19. }
  20. func (s *SubscribeOpenService) createSubscribe(deviceId, openId, module string) error {
  21. sub := &subscribe.Subscribe{
  22. OpenId: openId,
  23. DeviceId: deviceId,
  24. Modules: make(map[string]*subscribe.ModuleSubscribe),
  25. }
  26. sub.Modules[module] = &subscribe.ModuleSubscribe{
  27. Enabled: true,
  28. OpenAt: util.GetNowSecond(),
  29. }
  30. return mgm.Coll(sub).Create(sub)
  31. }
  32. func (this *SubscribeOpenService) updateSubscribe(deviceId string, openId string, module string) error {
  33. now := util.GetNowSecond()
  34. filter := bson.M{"device_id": deviceId}
  35. update := bson.M{
  36. "$set": bson.M{
  37. "device_id": deviceId,
  38. "open_id": openId,
  39. "modules." + module: bson.M{
  40. "enabled": true,
  41. "open_at": now,
  42. },
  43. "updated_at": now,
  44. },
  45. }
  46. opts := options.Update().SetUpsert(true)
  47. _, err := mgm.Coll(&subscribe.Subscribe{}).
  48. UpdateOne(context.Background(), filter, update, opts)
  49. return err
  50. }
  51. func (this *SubscribeOpenService) Open() error {
  52. logger.Info("user sign subscribe device_id: %s open_id: %s, module: %s", this.DeviceId, this.OpenId, this.Module)
  53. subscribeOne := new(subscribe.Subscribe)
  54. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": this.DeviceId}, subscribeOne)
  55. if err != nil {
  56. // 1️⃣ 没有记录
  57. if err == mongo.ErrNoDocuments {
  58. // 走创建逻辑
  59. return this.createSubscribe(this.DeviceId, this.OpenId, this.Module)
  60. }
  61. // 2️⃣ 其他错误
  62. return err
  63. }
  64. return this.updateSubscribe(this.DeviceId, this.OpenId, this.Module)
  65. }
  66. type SubscribeCloseService struct {
  67. DeviceId string `json:"device_id"`
  68. Module string `json:"module"`
  69. }
  70. func (this *SubscribeCloseService) Close() error {
  71. now := util.GetNowSecond()
  72. filter := bson.M{
  73. "device_id": this.DeviceId,
  74. "modules." + this.Module + ".enabled": true,
  75. }
  76. update := bson.M{
  77. "$set": bson.M{
  78. "modules." + this.Module + ".enabled": false,
  79. "modules." + this.Module + ".close_at": now,
  80. "updated_at": now,
  81. },
  82. }
  83. _, err := mgm.Coll(&subscribe.Subscribe{}).
  84. UpdateOne(context.Background(), filter, update)
  85. return err
  86. }
  87. type SubscribeCheckService struct {
  88. DeviceId string `json:"device_id"`
  89. }
  90. func (this *SubscribeCheckService) Check() error {
  91. return nil
  92. }
  93. type SubscribeSendService struct {
  94. Module string `json:"module"`
  95. Data []string `json:"data"`
  96. }
  97. func (this *SubscribeSendService) Send() error {
  98. var list = make([]*subscribe.Subscribe, 0)
  99. // 更具deviceId 和 module 获取 需要推送的用户openid 列表
  100. // data 应该是个json对象
  101. filter := bson.M{
  102. "modules." + this.Module + ".enabled": true,
  103. }
  104. if this.Module != "alliance" {
  105. filter["device_id"] = bson.M{"$in": this.Data}
  106. }
  107. err := mgm.Coll(&subscribe.Subscribe{}).
  108. SimpleFind(&list, filter)
  109. // 遍历所有list 获取openid列表后 循环发送
  110. if err != nil {
  111. return err
  112. }
  113. var openIds []string
  114. for _, v := range list {
  115. openIds = append(openIds, v.OpenId)
  116. }
  117. logger.Info("send openIds to SubscribeTask %v", openIds)
  118. task.SubscribeTask.Send(openIds, this.Module)
  119. return err
  120. }
  121. type SubscribeTaskService struct {
  122. DeviceId string `json:"device_id"`
  123. Module string `json:"module"`
  124. Timestamp int64 `json:"timestamp"`
  125. Status bool `json:"status"`
  126. }
  127. func (this *SubscribeTaskService) Task() error {
  128. var key string
  129. switch this.Module {
  130. case "hangup":
  131. key = fmt.Sprintf("%s_hangup", this.DeviceId)
  132. case "autofight":
  133. key = fmt.Sprintf("%s_autofight", this.DeviceId)
  134. default:
  135. return errors.New("unknown module")
  136. }
  137. this.Timestamp = util.GetNowSecond() + 5*1000
  138. if this.Status {
  139. task.SubscribeTask.Append(key, this.DeviceId, this.Timestamp, this.Module)
  140. } else {
  141. task.SubscribeTask.Remove(key)
  142. }
  143. return nil
  144. }