subscribe.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package sender
  2. import (
  3. "dsbqj-admin/model/mongo/subscribe"
  4. "dsbqj-admin/pkg/helper/wechat"
  5. "dsbqj-admin/pkg/logger"
  6. "fmt"
  7. "github.com/kamva/mgm/v3"
  8. "go.mongodb.org/mongo-driver/bson"
  9. "log"
  10. )
  11. type SubscribeSend struct {
  12. DeviceId string
  13. OpenIds []string
  14. Module string
  15. }
  16. type SubscribeSender struct {
  17. workerNum int
  18. queue chan *SubscribeSend
  19. wxHelper *wechat.WechatHelper
  20. }
  21. func NewSubscribeSender(workerNum int) *SubscribeSender {
  22. return &SubscribeSender{
  23. workerNum: workerNum,
  24. queue: make(chan *SubscribeSend, 1000),
  25. wxHelper: wechat.NewWechatHelper(),
  26. }
  27. }
  28. func (this *SubscribeSender) Send(send *SubscribeSend) {
  29. select {
  30. case this.queue <- send:
  31. default:
  32. logger.Info("subscribe sender queue full, drop task=%+v", send)
  33. }
  34. }
  35. func (this *SubscribeSender) Start() {
  36. for i := 0; i < this.workerNum; i++ {
  37. go this.worker(i)
  38. }
  39. }
  40. func (this *SubscribeSender) worker(id int) {
  41. for task := range this.queue {
  42. this.safeSend(task)
  43. }
  44. }
  45. func (this *SubscribeSender) safeSend(send *SubscribeSend) {
  46. defer func() {
  47. if r := recover(); r != nil {
  48. log.Printf("send subscribe panic: %v", r)
  49. }
  50. }()
  51. switch send.Module {
  52. case "hangup":
  53. this.SendHangupSubscribe(send.DeviceId)
  54. case "autofight":
  55. this.SendHangupSubscribe(send.DeviceId)
  56. case "guildgame":
  57. this.SendGuildGameSubscribe(send.OpenIds)
  58. default:
  59. log.Printf("[push] unknown type=%s, openId=%s",
  60. send.Module, send.DeviceId)
  61. }
  62. }
  63. func (this *SubscribeSender) SendHangupSubscribe(deviceId string) {
  64. subscribeOne := new(subscribe.Subscribe)
  65. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
  66. if err != nil {
  67. return
  68. }
  69. msg := make(map[string]map[string]string)
  70. msg["thing12"] = make(map[string]string)
  71. msg["thing12"]["value"] = "天气不错"
  72. msg["thing13"] = make(map[string]string)
  73. msg["thing13"]["value"] = "明天下雨"
  74. fmt.Println(this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, "UeiRSO8hHAFvTodH3SlWb_KoqT3Z2F2MoSvsDkZv7AE", msg))
  75. }
  76. func (this *SubscribeSender) SendAutoFightSubscribe(deviceId string) {
  77. subscribeOne := new(subscribe.Subscribe)
  78. err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
  79. if err != nil {
  80. return
  81. }
  82. msg := make(map[string]map[string]string)
  83. msg["thing12"] = make(map[string]string)
  84. msg["thing12"]["value"] = "天气不错"
  85. msg["thing13"] = make(map[string]string)
  86. msg["thing13"]["value"] = "明天下雨"
  87. fmt.Println(this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, "UeiRSO8hHAFvTodH3SlWb_KoqT3Z2F2MoSvsDkZv7AE", msg))
  88. }
  89. func (this *SubscribeSender) SendGuildGameSubscribe(openIds []string) {
  90. }