subscribe.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package sender
  2. import (
  3. "dsbqj-admin/model/mongo/subscribe"
  4. "dsbqj-admin/pkg/helper/wechat"
  5. "dsbqj-admin/pkg/logger"
  6. "github.com/kamva/mgm/v3"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "log"
  9. )
  10. type SubscribeSend struct {
  11. DeviceId string
  12. OpenIds []string
  13. Module string
  14. }
  15. type SubscribeSender struct {
  16. workerNum int
  17. queue chan *SubscribeSend
  18. wxHelper *wechat.WechatHelper
  19. }
  20. func NewSubscribeSender(workerNum int) *SubscribeSender {
  21. return &SubscribeSender{
  22. workerNum: workerNum,
  23. queue: make(chan *SubscribeSend, 1000),
  24. wxHelper: wechat.NewWechatHelper(),
  25. }
  26. }
  27. func (this *SubscribeSender) Send(send *SubscribeSend) {
  28. select {
  29. case this.queue <- send:
  30. default:
  31. logger.Info("subscribe sender queue full, drop task=%+v", send)
  32. }
  33. }
  34. func (this *SubscribeSender) Start() {
  35. for i := 0; i < this.workerNum; i++ {
  36. go this.worker(i)
  37. }
  38. }
  39. func (this *SubscribeSender) worker(id int) {
  40. for task := range this.queue {
  41. this.safeSend(task)
  42. }
  43. }
  44. func (this *SubscribeSender) safeSend(send *SubscribeSend) {
  45. defer func() {
  46. if r := recover(); r != nil {
  47. log.Printf("send subscribe panic: %v", r)
  48. }
  49. }()
  50. switch send.Module {
  51. case "hangup":
  52. this.SendHangupSubscribe(send.DeviceId)
  53. case "autofight":
  54. this.SendHangupSubscribe(send.DeviceId)
  55. case "guildgame":
  56. this.SendGuildGameSubscribe(send.OpenIds)
  57. case "alliance":
  58. this.SendAllianceSubscribe(send.OpenIds)
  59. case "warheavens":
  60. this.SendWarHeavensSubscribe(send.OpenIds)
  61. default:
  62. logger.Info("[push] unknown type=%s, openId=%s",
  63. send.Module, send.DeviceId)
  64. }
  65. }
  66. func (this *SubscribeSender) SendHangupSubscribe(deviceId string) {
  67. subscribeOne := new(subscribe.Subscribe)
  68. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
  69. if err != nil {
  70. return
  71. }
  72. msg := make(map[string]map[string]string)
  73. msg["thing1"] = make(map[string]string)
  74. msg["thing1"]["value"] = "挂机奖励时长已满"
  75. msg["thing3"] = make(map[string]string)
  76. msg["thing3"]["value"] = "您的挂机奖励时长已满,请打开游戏领取"
  77. this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, "0ByvFSV3B3U5-1XyvruRe7AMHAxiwQBFUL50lmoatCY", msg)
  78. }
  79. func (this *SubscribeSender) SendAutoFightSubscribe(deviceId string) {
  80. subscribeOne := new(subscribe.Subscribe)
  81. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
  82. if err != nil {
  83. return
  84. }
  85. msg := make(map[string]map[string]string)
  86. msg["thing2"] = make(map[string]string)
  87. msg["thing2"]["value"] = "离线闯关即将结束,请及时收取您的离线闯关奖励。"
  88. msg["thing1"] = make(map[string]string)
  89. msg["thing1"]["value"] = "离线闯关提醒"
  90. this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, "5rwMjLs6nVQ0SMtS0EAPEpmXZduOdEgSWbIjSNG5T7M", msg)
  91. }
  92. func (this *SubscribeSender) SendGuildGameSubscribe(openIds []string) {
  93. msg := make(map[string]map[string]string)
  94. msg["thing2"] = make(map[string]string)
  95. msg["thing2"]["value"] = "门派攻防战即将开始,为门派荣誉而战吧。"
  96. msg["thing4"] = make(map[string]string)
  97. msg["thing4"]["value"] = "门派攻防战"
  98. for _, openId := range openIds {
  99. this.wxHelper.SendWechatSubscribe(openId, "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU", msg)
  100. }
  101. }
  102. func (this *SubscribeSender) SendWarHeavensSubscribe(openIds []string) {
  103. msg := make(map[string]map[string]string)
  104. msg["thing2"] = make(map[string]string)
  105. msg["thing2"]["value"] = "三界战场即将开始,争夺人间道统。"
  106. msg["thing4"] = make(map[string]string)
  107. msg["thing4"]["value"] = "三界争峰"
  108. for _, openId := range openIds {
  109. this.wxHelper.SendWechatSubscribe(openId, "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU", msg)
  110. }
  111. }
  112. func (this *SubscribeSender) SendAllianceSubscribe(openIds []string) {
  113. msg := make(map[string]map[string]string)
  114. msg["thing2"] = make(map[string]string)
  115. msg["thing2"]["value"] = "独战诸仙即将开始!"
  116. msg["thing4"] = make(map[string]string)
  117. msg["thing4"]["value"] = "决战诸仙"
  118. for _, openId := range openIds {
  119. this.wxHelper.SendWechatSubscribe(openId, "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU", msg)
  120. }
  121. }