subscribe.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package task
  2. import (
  3. "dsbqj-admin/pkg/helper/wechat"
  4. "dsbqj-admin/pkg/logger"
  5. "dsbqj-admin/pkg/sender"
  6. "dsbqj-admin/pkg/util"
  7. "sync"
  8. )
  9. var SubscribeTask *Subscribe
  10. type PushItem struct {
  11. Key string // 玩家设备id
  12. DeviceId string
  13. Module string // 推送类型
  14. Timestamp int64 // 时间戳
  15. }
  16. type Subscribe struct {
  17. sync.Mutex
  18. waiting map[string]*PushItem
  19. wxHelper *wechat.WechatHelper
  20. sender *sender.SubscribeSender
  21. }
  22. func SubscribeInit() *Subscribe {
  23. SubscribeTask = new(Subscribe)
  24. SubscribeTask.waiting = make(map[string]*PushItem)
  25. SubscribeTask.sender = sender.NewSubscribeSender(10)
  26. SubscribeTask.sender.Start()
  27. return SubscribeTask
  28. }
  29. func (this *Subscribe) Exec() {
  30. now := util.GetNowSecond()
  31. var needPush []*PushItem
  32. this.Lock()
  33. for key, item := range this.waiting {
  34. if item.Timestamp <= now {
  35. needPush = append(needPush, item)
  36. delete(this.waiting, key)
  37. }
  38. }
  39. this.Unlock()
  40. for _, item := range needPush {
  41. logger.Info("Subscribe exec neePush module %s, device id %s", item.Module, item.DeviceId)
  42. this.sender.Send(&sender.SubscribeSend{
  43. DeviceId: item.DeviceId,
  44. Module: item.Module,
  45. }) // 触发后移除,防止重复
  46. }
  47. }
  48. func (this *Subscribe) Append(key string, deviceId string, timestamp int64, module string) {
  49. this.Lock()
  50. defer this.Unlock()
  51. newPushItem := &PushItem{key, deviceId, module, timestamp}
  52. this.waiting[key] = newPushItem
  53. }
  54. func (this *Subscribe) Remove(key string) {
  55. this.Lock()
  56. defer this.Unlock()
  57. delete(this.waiting, key)
  58. }
  59. func (this *Subscribe) Send(openIds []string, module string) {
  60. this.sender.Send(&sender.SubscribeSend{
  61. OpenIds: openIds,
  62. Module: module,
  63. })
  64. }