subscribe.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package sender
  2. import (
  3. "dsbqj-admin/model/mongo/subscribe"
  4. "dsbqj-admin/pkg/helper/wechat"
  5. "dsbqj-admin/pkg/logger"
  6. "github.com/kamva/mgm/v3"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "log"
  9. "os"
  10. )
  11. var templateMap = map[string]map[string]string{
  12. "test": {
  13. "hangup": "0ByvFSV3B3U5-1XyvruRe7AMHAxiwQBFUL50lmoatCY",
  14. "autofight": "5rwMjLs6nVQ0SMtS0EAPEpmXZduOdEgSWbIjSNG5T7M",
  15. "guildgame": "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU",
  16. "alliance": "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU",
  17. "warheavens": "e6D_r9_5tKNZK0cvM8fjzMXy9z1-aoNYQ9TSvAEuklU",
  18. },
  19. "product": {
  20. "hangup": "wAXZWcbt8Hkxmt2iZXR7ieiM1A_Mqt7cFU5ky6dZNMo",
  21. "autofight": "HbFIcVV78vbPr1uhe-tVAo8oQKv2jnljvPYRl-ktiFE",
  22. "guildgame": "Q6JBEqIPufrfmsZw-Wds2sdLATfJU2zA0ebpMR3W2F4",
  23. "alliance": "Q6JBEqIPufrfmsZw-Wds2sdLATfJU2zA0ebpMR3W2F4",
  24. "warheavens": "Q6JBEqIPufrfmsZw-Wds2sdLATfJU2zA0ebpMR3W2F4",
  25. },
  26. }
  27. type SubscribeSend struct {
  28. DeviceId string
  29. OpenIds []string
  30. Module string
  31. }
  32. type SubscribeSender struct {
  33. workerNum int
  34. queue chan *SubscribeSend
  35. wxHelper *wechat.WechatHelper
  36. }
  37. func NewSubscribeSender(workerNum int) *SubscribeSender {
  38. return &SubscribeSender{
  39. workerNum: workerNum,
  40. queue: make(chan *SubscribeSend, 1000),
  41. wxHelper: wechat.NewWechatHelper(),
  42. }
  43. }
  44. func (this *SubscribeSender) Send(send *SubscribeSend) {
  45. select {
  46. case this.queue <- send:
  47. default:
  48. logger.Info("subscribe sender queue full, drop task=%+v", send)
  49. }
  50. }
  51. func (this *SubscribeSender) Start() {
  52. for i := 0; i < this.workerNum; i++ {
  53. go this.worker(i)
  54. }
  55. }
  56. func (this *SubscribeSender) worker(id int) {
  57. for task := range this.queue {
  58. logger.Info("Subscribe worker(%d) do safe send module %s, openid_len %d device id %s", id, task.Module, len(task.OpenIds), task.DeviceId)
  59. this.safeSend(task)
  60. }
  61. }
  62. func (this *SubscribeSender) safeSend(send *SubscribeSend) {
  63. defer func() {
  64. if r := recover(); r != nil {
  65. log.Printf("send subscribe panic: %v", r)
  66. }
  67. }()
  68. switch send.Module {
  69. case "hangup":
  70. this.SendHangupSubscribe(send.DeviceId)
  71. case "autofight":
  72. this.SendAutoFightSubscribe(send.DeviceId)
  73. case "guildgame":
  74. this.SendGuildGameSubscribe(send.OpenIds)
  75. case "alliance":
  76. this.SendAllianceSubscribe(send.OpenIds)
  77. case "warheavens":
  78. this.SendWarHeavensSubscribe(send.OpenIds)
  79. default:
  80. logger.Info("[push] unknown type=%s, openId=%s",
  81. send.Module, send.DeviceId)
  82. }
  83. }
  84. func (this *SubscribeSender) SendHangupSubscribe(deviceId string) {
  85. subscribeOne := new(subscribe.Subscribe)
  86. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
  87. if err != nil {
  88. return
  89. }
  90. msg := make(map[string]map[string]string)
  91. msg["thing1"] = make(map[string]string)
  92. msg["thing1"]["value"] = "挂机奖励时长已满"
  93. msg["thing3"] = make(map[string]string)
  94. msg["thing3"]["value"] = "您的挂机奖励时长已满,请打开游戏领取"
  95. this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, templateMap[os.Getenv("CHANNEL")]["hangup"], msg)
  96. }
  97. func (this *SubscribeSender) SendAutoFightSubscribe(deviceId string) {
  98. subscribeOne := new(subscribe.Subscribe)
  99. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.autofight.enabled": true}, subscribeOne)
  100. if err != nil {
  101. return
  102. }
  103. msg := make(map[string]map[string]string)
  104. msg["thing2"] = make(map[string]string)
  105. msg["thing2"]["value"] = "离线闯关即将结束,请及时收取您的离线闯关奖励。"
  106. msg["thing1"] = make(map[string]string)
  107. msg["thing1"]["value"] = "离线闯关提醒"
  108. this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, templateMap[os.Getenv("CHANNEL")]["autofight"], msg)
  109. }
  110. func (this *SubscribeSender) SendGuildGameSubscribe(openIds []string) {
  111. msg := make(map[string]map[string]string)
  112. msg["thing2"] = make(map[string]string)
  113. msg["thing2"]["value"] = "门派攻防战即将开始,为门派荣誉而战吧。"
  114. msg["thing4"] = make(map[string]string)
  115. msg["thing4"]["value"] = "门派攻防战"
  116. for _, openId := range openIds {
  117. this.wxHelper.SendWechatSubscribe(openId, templateMap[os.Getenv("CHANNEL")]["guildgame"], msg)
  118. }
  119. }
  120. func (this *SubscribeSender) SendWarHeavensSubscribe(openIds []string) {
  121. msg := make(map[string]map[string]string)
  122. msg["thing2"] = make(map[string]string)
  123. msg["thing2"]["value"] = "三界战场即将开始,争夺人间道统。"
  124. msg["thing4"] = make(map[string]string)
  125. msg["thing4"]["value"] = "三界争峰"
  126. for _, openId := range openIds {
  127. this.wxHelper.SendWechatSubscribe(openId, templateMap[os.Getenv("CHANNEL")]["alliance"], msg)
  128. }
  129. }
  130. func (this *SubscribeSender) SendAllianceSubscribe(openIds []string) {
  131. msg := make(map[string]map[string]string)
  132. msg["thing2"] = make(map[string]string)
  133. msg["thing2"]["value"] = "独战诸仙即将开始!"
  134. msg["thing4"] = make(map[string]string)
  135. msg["thing4"]["value"] = "决战诸仙"
  136. for _, openId := range openIds {
  137. this.wxHelper.SendWechatSubscribe(openId, templateMap[os.Getenv("CHANNEL")]["warheavens"], msg)
  138. }
  139. }