subscribe.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. package task
  2. import (
  3. "dsbqj-admin/pkg/helper/wechat"
  4. "dsbqj-admin/pkg/sender"
  5. "dsbqj-admin/pkg/util"
  6. "sync"
  7. )
  8. var SubscribeTask *Subscribe
  9. type PushItem struct {
  10. Key string // 玩家设备id
  11. DeviceId string
  12. Module string // 推送类型
  13. Timestamp int64 // 时间戳
  14. }
  15. type Subscribe struct {
  16. sync.Mutex
  17. waiting map[string]*PushItem
  18. wxHelper *wechat.WechatHelper
  19. sender *sender.SubscribeSender
  20. }
  21. func SubscribeInit() *Subscribe {
  22. SubscribeTask = new(Subscribe)
  23. SubscribeTask.waiting = make(map[string]*PushItem)
  24. SubscribeTask.sender = sender.NewSubscribeSender(10)
  25. SubscribeTask.sender.Start()
  26. return SubscribeTask
  27. }
  28. func (this *Subscribe) Exec() {
  29. now := util.GetNowSecond()
  30. var needPush []string
  31. this.Lock()
  32. for key, item := range this.waiting {
  33. if item.Timestamp <= now {
  34. needPush = append(needPush, key)
  35. delete(this.waiting, key) // 触发后移除,防止重复
  36. }
  37. }
  38. this.Unlock()
  39. for _, key := range needPush {
  40. item := this.waiting[key]
  41. this.sender.Send(&sender.SubscribeSend{
  42. DeviceId: item.DeviceId,
  43. Module: item.Module,
  44. })
  45. }
  46. }
  47. func (this *Subscribe) Append(key string, deviceId string, timestamp int64, module string) {
  48. this.Lock()
  49. defer this.Unlock()
  50. newPushItem := &PushItem{key, deviceId, module, timestamp}
  51. this.waiting[key] = newPushItem
  52. }
  53. func (this *Subscribe) Remove(key string) {
  54. this.Lock()
  55. defer this.Unlock()
  56. delete(this.waiting, key)
  57. }
  58. func (this *Subscribe) Send(openIds []string, module string) {
  59. this.sender.Send(&sender.SubscribeSend{
  60. OpenIds: openIds,
  61. Module: module,
  62. })
  63. }