subscribe.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package sender
  2. import (
  3. "dsbqj-admin/model/mongo/subscribe"
  4. "dsbqj-admin/pkg/helper/wechat"
  5. "dsbqj-admin/pkg/logger"
  6. "github.com/kamva/mgm/v3"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "log"
  9. "os"
  10. )
  11. var templateMap = map[string]map[string]string{
  12. "test": {
  13. "hangup": "0ByvFSV3B3U5-1XyvruRe7AMHAxiwQBFUL50lmoatCY",
  14. "autofight": "5rwMjLs6nVQ0SMtS0EAPEpmXZduOdEgSWbIjSNG5T7M",
  15. "guildgame": "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU",
  16. "alliance": "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU",
  17. "warheavens": "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU",
  18. },
  19. "product": {
  20. "hangup": "wAXZWcbt8Hkxmt2iZXR7ieiM1A_Mqt7cFU5ky6dZNMo",
  21. "autofight": "HbFIcVV78vbPr1uhe-tVAo8oQKv2jnljvPYRl-ktiFE",
  22. "guildgame": "Q6JBEqIPufrfmsZw-Wds2sdLATfJU2zA0ebpMR3W2F4",
  23. "alliance": "Q6JBEqIPufrfmsZw-Wds2sdLATfJU2zA0ebpMR3W2F4",
  24. "warheavens": "Q6JBEqIPufrfmsZw-Wds2sdLATfJU2zA0ebpMR3W2F4",
  25. },
  26. }
  27. type SubscribeSend struct {
  28. DeviceId string
  29. OpenIds []string
  30. Module string
  31. }
  32. type SubscribeSender struct {
  33. workerNum int
  34. queue chan *SubscribeSend
  35. wxHelper *wechat.WechatHelper
  36. }
  37. func NewSubscribeSender(workerNum int) *SubscribeSender {
  38. return &SubscribeSender{
  39. workerNum: workerNum,
  40. queue: make(chan *SubscribeSend, 1000),
  41. wxHelper: wechat.NewWechatHelper(),
  42. }
  43. }
  44. func (this *SubscribeSender) Send(send *SubscribeSend) {
  45. select {
  46. case this.queue <- send:
  47. default:
  48. logger.Info("subscribe sender queue full, drop task=%+v", send)
  49. }
  50. }
  51. func (this *SubscribeSender) Start() {
  52. for i := 0; i < this.workerNum; i++ {
  53. go this.worker(i)
  54. }
  55. }
  56. func (this *SubscribeSender) worker(id int) {
  57. for task := range this.queue {
  58. this.safeSend(task)
  59. }
  60. }
  61. func (this *SubscribeSender) safeSend(send *SubscribeSend) {
  62. defer func() {
  63. if r := recover(); r != nil {
  64. log.Printf("send subscribe panic: %v", r)
  65. }
  66. }()
  67. switch send.Module {
  68. case "hangup":
  69. this.SendHangupSubscribe(send.DeviceId)
  70. case "autofight":
  71. this.SendHangupSubscribe(send.DeviceId)
  72. case "guildgame":
  73. this.SendGuildGameSubscribe(send.OpenIds)
  74. case "alliance":
  75. this.SendAllianceSubscribe(send.OpenIds)
  76. case "warheavens":
  77. this.SendWarHeavensSubscribe(send.OpenIds)
  78. default:
  79. logger.Info("[push] unknown type=%s, openId=%s",
  80. send.Module, send.DeviceId)
  81. }
  82. }
  83. func (this *SubscribeSender) SendHangupSubscribe(deviceId string) {
  84. subscribeOne := new(subscribe.Subscribe)
  85. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
  86. if err != nil {
  87. return
  88. }
  89. msg := make(map[string]map[string]string)
  90. msg["thing1"] = make(map[string]string)
  91. msg["thing1"]["value"] = "挂机奖励时长已满"
  92. msg["thing3"] = make(map[string]string)
  93. msg["thing3"]["value"] = "您的挂机奖励时长已满,请打开游戏领取"
  94. this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, templateMap[os.Getenv("CHANNEL")]["hangup"], msg)
  95. }
  96. func (this *SubscribeSender) SendAutoFightSubscribe(deviceId string) {
  97. subscribeOne := new(subscribe.Subscribe)
  98. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
  99. if err != nil {
  100. return
  101. }
  102. msg := make(map[string]map[string]string)
  103. msg["thing2"] = make(map[string]string)
  104. msg["thing2"]["value"] = "离线闯关即将结束,请及时收取您的离线闯关奖励。"
  105. msg["thing1"] = make(map[string]string)
  106. msg["thing1"]["value"] = "离线闯关提醒"
  107. this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, templateMap[os.Getenv("CHANNEL")]["autofight"], msg)
  108. }
  109. func (this *SubscribeSender) SendGuildGameSubscribe(openIds []string) {
  110. msg := make(map[string]map[string]string)
  111. msg["thing2"] = make(map[string]string)
  112. msg["thing2"]["value"] = "门派攻防战即将开始,为门派荣誉而战吧。"
  113. msg["thing4"] = make(map[string]string)
  114. msg["thing4"]["value"] = "门派攻防战"
  115. for _, openId := range openIds {
  116. this.wxHelper.SendWechatSubscribe(openId, templateMap[os.Getenv("CHANNEL")]["guildgame"], msg)
  117. }
  118. }
  119. func (this *SubscribeSender) SendWarHeavensSubscribe(openIds []string) {
  120. msg := make(map[string]map[string]string)
  121. msg["thing2"] = make(map[string]string)
  122. msg["thing2"]["value"] = "三界战场即将开始,争夺人间道统。"
  123. msg["thing4"] = make(map[string]string)
  124. msg["thing4"]["value"] = "三界争峰"
  125. for _, openId := range openIds {
  126. this.wxHelper.SendWechatSubscribe(openId, templateMap[os.Getenv("CHANNEL")]["alliance"], msg)
  127. }
  128. }
  129. func (this *SubscribeSender) SendAllianceSubscribe(openIds []string) {
  130. msg := make(map[string]map[string]string)
  131. msg["thing2"] = make(map[string]string)
  132. msg["thing2"]["value"] = "独战诸仙即将开始!"
  133. msg["thing4"] = make(map[string]string)
  134. msg["thing4"]["value"] = "决战诸仙"
  135. for _, openId := range openIds {
  136. this.wxHelper.SendWechatSubscribe(openId, templateMap[os.Getenv("CHANNEL")]["warheavens"], msg)
  137. }
  138. }