subscribe.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package service
  2. import (
  3. "context"
  4. "dsbqj-admin/app/task"
  5. "dsbqj-admin/model/mongo/subscribe"
  6. "dsbqj-admin/pkg/logger"
  7. "dsbqj-admin/pkg/util"
  8. "errors"
  9. "fmt"
  10. "github.com/kamva/mgm/v3"
  11. "go.mongodb.org/mongo-driver/bson"
  12. "go.mongodb.org/mongo-driver/mongo"
  13. "go.mongodb.org/mongo-driver/mongo/options"
  14. )
  15. type SubscribeOpenService struct {
  16. DeviceId string `json:"device_id"`
  17. OpenId string `json:"open_id"`
  18. Module string `json:"module"`
  19. }
  20. func (s *SubscribeOpenService) createSubscribe(deviceId, openId, module string) error {
  21. sub := &subscribe.Subscribe{
  22. OpenId: openId,
  23. DeviceId: deviceId,
  24. Modules: make(map[string]*subscribe.ModuleSubscribe),
  25. }
  26. sub.Modules[module] = &subscribe.ModuleSubscribe{
  27. Enabled: true,
  28. OpenAt: util.GetNowSecond(),
  29. }
  30. return mgm.Coll(sub).Create(sub)
  31. }
  32. func (this *SubscribeOpenService) updateSubscribe(deviceId string, openId string, module string) error {
  33. now := util.GetNowSecond()
  34. filter := bson.M{"device_id": deviceId}
  35. update := bson.M{
  36. "$set": bson.M{
  37. "device_id": deviceId,
  38. "open_id": openId,
  39. "modules." + module: bson.M{
  40. "enabled": true,
  41. "open_at": now,
  42. },
  43. "updated_at": now,
  44. },
  45. }
  46. opts := options.Update().SetUpsert(true)
  47. _, err := mgm.Coll(&subscribe.Subscribe{}).
  48. UpdateOne(context.Background(), filter, update, opts)
  49. return err
  50. }
  51. func (this *SubscribeOpenService) Open() error {
  52. subscribeOne := new(subscribe.Subscribe)
  53. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": this.DeviceId}, subscribeOne)
  54. if err != nil {
  55. // 1️⃣ 没有记录
  56. if err == mongo.ErrNoDocuments {
  57. // 走创建逻辑
  58. return this.createSubscribe(this.DeviceId, this.OpenId, this.Module)
  59. }
  60. // 2️⃣ 其他错误
  61. return err
  62. }
  63. return this.updateSubscribe(this.DeviceId, this.OpenId, this.Module)
  64. }
  65. type SubscribeCloseService struct {
  66. DeviceId string `json:"device_id"`
  67. Module string `json:"module"`
  68. }
  69. func (this *SubscribeCloseService) Close() error {
  70. now := util.GetNowSecond()
  71. filter := bson.M{
  72. "device_id": this.DeviceId,
  73. "modules." + this.Module + ".enabled": true,
  74. }
  75. update := bson.M{
  76. "$set": bson.M{
  77. "modules." + this.Module + ".enabled": false,
  78. "modules." + this.Module + ".close_at": now,
  79. "updated_at": now,
  80. },
  81. }
  82. _, err := mgm.Coll(&subscribe.Subscribe{}).
  83. UpdateOne(context.Background(), filter, update)
  84. return err
  85. }
  86. type SubscribeCheckService struct {
  87. DeviceId string `json:"device_id"`
  88. }
  89. func (this *SubscribeCheckService) Check() error {
  90. return nil
  91. }
  92. type SubscribeSendService struct {
  93. Module string `json:"module"`
  94. Data []string `json:"data"`
  95. }
  96. func (this *SubscribeSendService) Send() error {
  97. var list = make([]*subscribe.Subscribe, 0)
  98. // 更具deviceId 和 module 获取 需要推送的用户openid 列表
  99. // data 应该是个json对象
  100. filter := bson.M{
  101. "modules." + this.Module + ".enabled": true,
  102. }
  103. if this.Module != "alliance" {
  104. filter["device_id"] = bson.M{"$in": this.Data}
  105. }
  106. err := mgm.Coll(&subscribe.Subscribe{}).
  107. SimpleFind(&list, filter)
  108. // 遍历所有list 获取openid列表后 循环发送
  109. if err != nil {
  110. return err
  111. }
  112. var openIds []string
  113. for _, v := range list {
  114. openIds = append(openIds, v.OpenId)
  115. }
  116. task.SubscribeTask.Send(openIds, this.Module)
  117. return err
  118. }
  119. type SubscribeTaskService struct {
  120. DeviceId string `json:"device_id"`
  121. Module string `json:"module"`
  122. Timestamp int64 `json:"timestamp"`
  123. Status bool `json:"status"`
  124. }
  125. func (this *SubscribeTaskService) Task() error {
  126. var key string
  127. switch this.Module {
  128. case "hangup":
  129. key = fmt.Sprintf("%s_hangup", this.DeviceId)
  130. case "autofight":
  131. key = fmt.Sprintf("%s_autofight", this.DeviceId)
  132. default:
  133. return errors.New("unknown module")
  134. }
  135. if this.Status {
  136. logger.Info("Subscribe task append key %s, device id %s timestamp %d, task_time %s", key, this.DeviceId, this.Timestamp, util.TimeStr2DateTime(this.Timestamp, util.TimeLayout))
  137. task.SubscribeTask.Append(key, this.DeviceId, this.Timestamp, this.Module)
  138. } else {
  139. task.SubscribeTask.Remove(key)
  140. }
  141. return nil
  142. }