subscribe.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package task
  2. import (
  3. "dsbqj-admin/pkg/helper/wechat"
  4. "dsbqj-admin/pkg/sender"
  5. "dsbqj-admin/pkg/util"
  6. "sync"
  7. )
  8. var SubscribeTask *Subscribe
  9. type PushItem struct {
  10. Key string // 玩家设备id
  11. DeviceId string
  12. Module string // 推送类型
  13. Timestamp int64 // 时间戳
  14. }
  15. type Subscribe struct {
  16. sync.Mutex
  17. waiting map[string]*PushItem
  18. wxHelper *wechat.WechatHelper
  19. sender *sender.SubscribeSender
  20. }
  21. func SubscribeInit() *Subscribe {
  22. SubscribeTask = new(Subscribe)
  23. SubscribeTask.waiting = make(map[string]*PushItem)
  24. SubscribeTask.sender = sender.NewSubscribeSender(10)
  25. SubscribeTask.sender.Start()
  26. return SubscribeTask
  27. }
  28. func (this *Subscribe) Exec() {
  29. now := util.GetNowSecond()
  30. var needPush []*PushItem
  31. this.Lock()
  32. for key, item := range this.waiting {
  33. if item.Timestamp <= now {
  34. needPush = append(needPush, item)
  35. delete(this.waiting, key)
  36. }
  37. }
  38. this.Unlock()
  39. for _, item := range needPush {
  40. this.sender.Send(&sender.SubscribeSend{
  41. DeviceId: item.DeviceId,
  42. Module: item.Module,
  43. }) // 触发后移除,防止重复
  44. }
  45. }
  46. func (this *Subscribe) Append(key string, deviceId string, timestamp int64, module string) {
  47. this.Lock()
  48. defer this.Unlock()
  49. newPushItem := &PushItem{key, deviceId, module, timestamp}
  50. this.waiting[key] = newPushItem
  51. }
  52. func (this *Subscribe) Remove(key string) {
  53. this.Lock()
  54. defer this.Unlock()
  55. delete(this.waiting, key)
  56. }
  57. func (this *Subscribe) Send(openIds []string, module string) {
  58. this.sender.Send(&sender.SubscribeSend{
  59. OpenIds: openIds,
  60. Module: module,
  61. })
  62. }