Parcourir la source

提交增加 wx订阅模块相关逻辑

DESKTOP-HN5QP3V\Administrator il y a 3 jours
Parent
commit
d0a8fa98fb

+ 79 - 0
app/api/v1/subscribe.go

@@ -0,0 +1,79 @@
+package v1
+
+import (
+	"dsbqj-admin/app/service"
+	"dsbqj-admin/pkg/app"
+	"dsbqj-admin/pkg/e"
+	"github.com/gin-gonic/gin"
+	"net/http"
+)
+
+func Open(c *gin.Context) {
+	var appG = app.Gin{C: c}
+	service := service.SubscribeOpenService{}
+	if err := c.ShouldBind(&service); err == nil {
+		if err := service.Open(); err != nil {
+			appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+		} else {
+			appG.Response(http.StatusOK, e.SUCCESS, nil)
+		}
+	} else {
+		appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+	}
+}
+
+func Close(c *gin.Context) {
+	var appG = app.Gin{C: c}
+	service := service.SubscribeCloseService{}
+	if err := c.ShouldBind(&service); err == nil {
+		if err := service.Close(); err != nil {
+			appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+		} else {
+			appG.Response(http.StatusOK, e.SUCCESS, nil)
+		}
+	} else {
+		appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+	}
+}
+
+func Check(c *gin.Context) {
+	var appG = app.Gin{C: c}
+	service := service.SubscribeCheckService{}
+	if err := c.ShouldBind(&service); err == nil {
+		if err := service.Check(); err != nil {
+			appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+		} else {
+			appG.Response(http.StatusOK, e.SUCCESS, nil)
+		}
+	} else {
+		appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+	}
+}
+
+func Send(c *gin.Context) {
+	var appG = app.Gin{C: c}
+	service := service.SubscribeSendService{}
+	if err := c.ShouldBind(&service); err == nil {
+		if err := service.Send(); err != nil {
+			appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+		} else {
+			appG.Response(http.StatusOK, e.SUCCESS, nil)
+		}
+	} else {
+		appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+	}
+}
+
+func Task(c *gin.Context) {
+	var appG = app.Gin{C: c}
+	service := service.SubscribeTaskService{}
+	if err := c.ShouldBind(&service); err == nil {
+		if err := service.Task(); err != nil {
+			appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+		} else {
+			appG.Response(http.StatusOK, e.SUCCESS, nil)
+		}
+	} else {
+		appG.Response(http.StatusOK, e.INVALID_PARAMS, err.Error())
+	}
+}

+ 9 - 0
app/router/router.go

@@ -40,6 +40,15 @@ func NewRouter() *gin.Engine {
 		pointv1.POST("thinkingdata", v1.ThinkingData)
 	}
 
+	subscribev1 := r.Group("/subscribe/v1")
+	{
+		subscribev1.POST("/open", v1.Open)
+		subscribev1.POST("/close", v1.Close)
+		subscribev1.POST("/check", v1.Check) // 检查用户当前订阅
+		subscribev1.POST("/send", v1.Send)   // 触发订阅直接发送
+		subscribev1.POST("/task", v1.Task)   // 触发订阅任务发送
+	}
+
 	//webv1.Use(middleware.BodyHandler())
 	//{
 	//

+ 170 - 0
app/service/subscribe.go

@@ -0,0 +1,170 @@
+package service
+
+import (
+	"context"
+	"dsbqj-admin/app/task"
+	"dsbqj-admin/model/mongo/subscribe"
+	"dsbqj-admin/pkg/util"
+	"errors"
+	"fmt"
+	"github.com/goccy/go-json"
+	"github.com/kamva/mgm/v3"
+	"go.mongodb.org/mongo-driver/bson"
+	"go.mongodb.org/mongo-driver/mongo"
+	"go.mongodb.org/mongo-driver/mongo/options"
+)
+
+type SubscribeOpenService struct {
+	DeviceId string `json:"device_id"`
+	OpenId   string `json:"open_id"`
+	Module   string `json:"module"`
+}
+
+func (s *SubscribeOpenService) createSubscribe(deviceId, openId, module string) error {
+	sub := &subscribe.Subscribe{
+		OpenId:   openId,
+		DeviceId: deviceId,
+		Modules:  make(map[string]*subscribe.ModuleSubscribe),
+	}
+
+	sub.Modules[module] = &subscribe.ModuleSubscribe{
+		Enabled: true,
+		OpenAt:  util.GetNowSecond(),
+	}
+
+	return mgm.Coll(sub).Create(sub)
+}
+
+func (this *SubscribeOpenService) updateSubscribe(deviceId string, module string) error {
+	now := util.GetNowSecond()
+
+	filter := bson.M{"device_id": deviceId}
+
+	update := bson.M{
+		"$set": bson.M{
+			"device_id": deviceId,
+			"modules." + module: bson.M{
+				"enabled": true,
+				"open_at": now,
+			},
+			"updated_at": now,
+		},
+	}
+
+	opts := options.Update().SetUpsert(true)
+
+	_, err := mgm.Coll(&subscribe.Subscribe{}).
+		UpdateOne(context.Background(), filter, update, opts)
+
+	return err
+}
+
+func (this *SubscribeOpenService) Open() error {
+	subscribeOne := new(subscribe.Subscribe)
+	err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": this.DeviceId}, subscribeOne)
+	if err != nil {
+		// 1️⃣ 没有记录
+		if err == mongo.ErrNoDocuments {
+			// 走创建逻辑
+			return this.createSubscribe(this.DeviceId, this.OpenId, this.Module)
+		}
+
+		// 2️⃣ 其他错误
+		return err
+	}
+	return this.updateSubscribe(this.DeviceId, this.Module)
+}
+
+type SubscribeCloseService struct {
+	DeviceId string `json:"device_id"`
+	Module   string `json:"module"`
+}
+
+func (this *SubscribeCloseService) Close() error {
+	now := util.GetNowSecond()
+
+	filter := bson.M{
+		"device_id":                           this.DeviceId,
+		"modules." + this.Module + ".enabled": true,
+	}
+
+	update := bson.M{
+		"$set": bson.M{
+			"modules." + this.Module + ".enabled":  false,
+			"modules." + this.Module + ".close_at": now,
+			"updated_at":                           now,
+		},
+	}
+
+	_, err := mgm.Coll(&subscribe.Subscribe{}).
+		UpdateOne(context.Background(), filter, update)
+
+	return err
+}
+
+type SubscribeCheckService struct {
+	DeviceId string `json:"device_id"`
+}
+
+func (this *SubscribeCheckService) Check() error {
+
+	return nil
+}
+
+type SubscribeSendService struct {
+	DeviceId string `json:"device_id"`
+	Module   string `json:"module"`
+	Data     string `json:"data"`
+}
+
+func (this *SubscribeSendService) Send() error {
+	var list = make([]*subscribe.Subscribe, 0)
+	// 更具deviceId 和 module 获取 需要推送的用户openid 列表
+	// data 应该是个json对象
+	var deviceIds []string
+	json.Unmarshal([]byte(this.Data), &deviceIds)
+
+	filter := bson.M{
+		"device_id":                           bson.M{"$in": deviceIds},
+		"modules." + this.Module + ".enabled": true,
+	}
+
+	err := mgm.Coll(&subscribe.Subscribe{}).
+		SimpleFind(&list, filter)
+
+	// 遍历所有list 获取openid列表后 循环发送
+	if err != nil {
+		return err
+	}
+
+	var openIds []string
+	for _, v := range list {
+		openIds = append(openIds, v.OpenId)
+	}
+
+	task.SubscribeTask.Send(openIds, this.Module)
+	return err
+}
+
+type SubscribeTaskService struct {
+	DeviceId  string `json:"device_id"`
+	Module    string `json:"module"`
+	Timestamp int64  `json:"timestamp"`
+}
+
+func (this *SubscribeTaskService) Task() error {
+	var key string
+
+	switch this.Module {
+	case "hangup":
+		key = fmt.Sprintf("$s_hangup", this.DeviceId)
+	case "autofight":
+		key = fmt.Sprintf("$s_autofight", this.DeviceId)
+	default:
+		return errors.New("unknown module")
+	}
+
+	task.SubscribeTask.Append(key, this.DeviceId, this.Timestamp, this.Module)
+
+	return nil
+}

+ 1 - 1
app/service/version.go

@@ -13,7 +13,7 @@ type VersionReloadService struct {
 }
 
 func (service *VersionReloadService) Reload() error {
-	task.VersionTask.Reload()
+	task.VersionTask.Exec()
 	return nil
 }
 

+ 5 - 0
app/task/interface.go

@@ -0,0 +1,5 @@
+package task
+
+type ITask interface {
+	Exec()
+}

+ 77 - 0
app/task/subscribe.go

@@ -0,0 +1,77 @@
+package task
+
+import (
+	"dsbqj-admin/pkg/helper/wechat"
+	"dsbqj-admin/pkg/sender"
+	"dsbqj-admin/pkg/util"
+	"sync"
+)
+
+var SubscribeTask *Subscribe
+
+type PushItem struct {
+	Key       string // 玩家设备id
+	DeviceId  string
+	Module    string // 推送类型
+	Timestamp int64  // 时间戳
+}
+
+type Subscribe struct {
+	sync.Mutex
+	waiting  map[string]*PushItem
+	wxHelper *wechat.WechatHelper
+	sender   *sender.SubscribeSender
+}
+
+func SubscribeInit() *Subscribe {
+	SubscribeTask = new(Subscribe)
+	SubscribeTask.waiting = make(map[string]*PushItem)
+	SubscribeTask.sender = sender.NewSubscribeSender(10)
+	SubscribeTask.sender.Start()
+	return SubscribeTask
+}
+
+func (this *Subscribe) Exec() {
+	now := util.GetNowSecond()
+
+	var needPush []string
+
+	this.Lock()
+	for key, item := range this.waiting {
+		if item.Timestamp <= now {
+			needPush = append(needPush, key)
+			delete(this.waiting, key) // 触发后移除,防止重复
+		}
+	}
+	this.Unlock()
+
+	for _, key := range needPush {
+		item := this.waiting[key]
+		this.sender.Send(&sender.SubscribeSend{
+			DeviceId: item.DeviceId,
+			Module:   item.Module,
+		})
+	}
+}
+
+func (this *Subscribe) Append(key string, deviceId string, timestamp int64, module string) {
+	this.Lock()
+	defer this.Unlock()
+
+	newPushItem := &PushItem{key, deviceId, module, timestamp}
+	this.waiting[key] = newPushItem
+}
+
+func (this *Subscribe) Remove(key string) {
+	this.Lock()
+	defer this.Unlock()
+
+	delete(this.waiting, key)
+}
+
+func (this *Subscribe) Send(openIds []string, module string) {
+	this.sender.Send(&sender.SubscribeSend{
+		OpenIds: openIds,
+		Module:  module,
+	})
+}

+ 3 - 54
app/task/task.go

@@ -1,66 +1,15 @@
 package task
 
 import (
-	"dsbqj-admin/model/mongo/version"
-	"dsbqj-admin/pkg/util"
-	"github.com/kamva/mgm/v3"
-	"go.mongodb.org/mongo-driver/bson"
-	"sync"
 	"time"
 )
 
-var VersionTask *Versions
-
-type Versions struct {
-	sync.Mutex
-	VList util.SafeArray[*version.Version]
-	VMap  util.SafeMap[string, *version.Version]
-}
-
-func Init() *Versions {
-	VersionTask = new(Versions)
-	return VersionTask
-}
-
-func (this *Versions) Run() {
+func Run(task ITask, sleep time.Duration) {
 	// 每5分钟执行一次
 	go func() {
 		for {
-			this.Reload()
-			time.Sleep(5 * time.Minute)
+			task.Exec()
+			time.Sleep(sleep)
 		}
 	}()
-
-}
-
-func (this *Versions) Reload() {
-	var versions = make([]*version.Version, 0)
-	err := mgm.Coll(&version.Version{}).SimpleFind(&versions, bson.M{})
-	if err == nil {
-		// 重置
-		this.VList.Flush()
-		this.VList.PushMany(versions)
-		this.VMap.Clear()
-		for _, v := range versions {
-			this.VMap.Set(v.Version, v)
-		}
-	}
-}
-
-func (this *Versions) Check(version string) *version.Version {
-	data, ok := this.VMap.Get(version)
-	if !ok { // 遍历列表
-		for _, v := range this.VList.View() {
-			if v.Default {
-				data = v
-				break
-			}
-		}
-	}
-
-	return data
-}
-
-func (this *Versions) Servers() []*version.Version {
-	return this.VList.View()
 }

+ 54 - 0
app/task/version.go

@@ -0,0 +1,54 @@
+package task
+
+import (
+	"dsbqj-admin/model/mongo/version"
+	"dsbqj-admin/pkg/util"
+	"github.com/kamva/mgm/v3"
+	"go.mongodb.org/mongo-driver/bson"
+	"sync"
+)
+
+var VersionTask *Versions
+
+type Versions struct {
+	sync.Mutex
+	VList util.SafeArray[*version.Version]
+	VMap  util.SafeMap[string, *version.Version]
+}
+
+func VersionInit() *Versions {
+	VersionTask = new(Versions)
+	return VersionTask
+}
+
+func (this *Versions) Exec() {
+	var versions = make([]*version.Version, 0)
+	err := mgm.Coll(&version.Version{}).SimpleFind(&versions, bson.M{})
+	if err == nil {
+		// 重置
+		this.VList.Flush()
+		this.VList.PushMany(versions)
+		this.VMap.Clear()
+		for _, v := range versions {
+			this.VMap.Set(v.Version, v)
+		}
+	}
+}
+
+func (this *Versions) Check(version string) *version.Version {
+	data, ok := this.VMap.Get(version)
+	if !ok { // 遍历列表
+		for _, v := range this.VList.View() {
+			if v.Default {
+				data = v
+				break
+			}
+		}
+	}
+
+	return data
+}
+
+func (this *Versions) Servers() []*version.Version {
+	return this.VList.View()
+}

+ 10 - 4
go.mod

@@ -8,7 +8,7 @@ require (
 	github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
 	github.com/aliyun/credentials-go v1.4.6
 	github.com/astaxie/beego v1.12.3
-	github.com/bytedance/sonic v1.13.2
+	github.com/bytedance/sonic v1.14.2
 	github.com/fatih/color v1.18.0
 	github.com/gin-contrib/cors v1.7.5
 	github.com/gin-contrib/sessions v1.0.3
@@ -19,9 +19,11 @@ require (
 	github.com/jinzhu/gorm v1.9.16
 	github.com/joho/godotenv v1.5.1
 	github.com/kamva/mgm/v3 v3.5.0
+	github.com/silenceper/wechat v1.2.6
 	github.com/speps/go-hashids v2.0.0+incompatible
 	github.com/unknwon/com v1.0.1
 	go.mongodb.org/mongo-driver v1.17.3
+	golang.org/x/crypto v0.37.0
 	gopkg.in/yaml.v2 v2.4.0
 )
 
@@ -31,16 +33,20 @@ require (
 	github.com/alibabacloud-go/endpoint-util v1.1.0 // indirect
 	github.com/alibabacloud-go/openapi-util v0.0.11 // indirect
 	github.com/alibabacloud-go/tea-xml v1.1.2 // indirect
-	github.com/bytedance/sonic/loader v0.2.4 // indirect
+	github.com/bradfitz/gomemcache v0.0.0-20250403215159-8d39553ac7cf // indirect
+	github.com/bytedance/gopkg v0.1.3 // indirect
+	github.com/bytedance/sonic/loader v0.4.0 // indirect
 	github.com/clbanning/mxj/v2 v2.7.0 // indirect
-	github.com/cloudwego/base64x v0.1.5 // indirect
+	github.com/cloudwego/base64x v0.1.6 // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
+	github.com/fatih/structs v1.1.0 // indirect
 	github.com/gabriel-vasile/mimetype v1.4.8 // indirect
 	github.com/gin-contrib/sse v1.0.0 // indirect
 	github.com/go-playground/locales v0.14.1 // indirect
 	github.com/go-playground/universal-translator v0.18.1 // indirect
 	github.com/go-sql-driver/mysql v1.5.0 // indirect
 	github.com/golang/snappy v1.0.0 // indirect
+	github.com/gomodule/redigo v2.0.0+incompatible // indirect
 	github.com/gorilla/context v1.1.2 // indirect
 	github.com/gorilla/securecookie v1.1.2 // indirect
 	github.com/gorilla/sessions v1.4.0 // indirect
@@ -57,6 +63,7 @@ require (
 	github.com/pelletier/go-toml/v2 v2.2.3 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect
+	github.com/spf13/cast v1.3.1 // indirect
 	github.com/stretchr/testify v1.10.0 // indirect
 	github.com/tjfoc/gmsm v1.3.2 // indirect
 	github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
@@ -66,7 +73,6 @@ require (
 	github.com/xdg-go/stringprep v1.0.4 // indirect
 	github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
 	golang.org/x/arch v0.16.0 // indirect
-	golang.org/x/crypto v0.37.0 // indirect
 	golang.org/x/net v0.38.0 // indirect
 	golang.org/x/sync v0.13.0 // indirect
 	golang.org/x/sys v0.32.0 // indirect

+ 27 - 13
go.sum

@@ -45,6 +45,7 @@ github.com/aliyun/credentials-go v1.1.2/go.mod h1:ozcZaMR5kLM7pwtCMEpVmQ242suV6q
 github.com/aliyun/credentials-go v1.4.6 h1:CG8rc/nxCNKfXbZWpWDzI9GjF4Tuu3Es14qT8Y0ClOk=
 github.com/aliyun/credentials-go v1.4.6/go.mod h1:Jm6d+xIgwJVLVWT561vy67ZRP4lPTQxMbEYRuT2Ti1U=
 github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
+github.com/astaxie/beego v1.7.1/go.mod h1:0R4++1tUqERR0WYFWdfkcrsyoVBCG4DgpDGokT3yb+U=
 github.com/astaxie/beego v1.12.3 h1:SAQkdD2ePye+v8Gn1r4X6IKZM1wd28EyUOVQ3PDSOOQ=
 github.com/astaxie/beego v1.12.3/go.mod h1:p3qIm0Ryx7zeBHLljmd7omloyca1s4yu1a8kM1FkpIA=
 github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ=
@@ -52,12 +53,16 @@ github.com/beego/x2j v0.0.0-20131220205130-a0352aadc542/go.mod h1:kSeGC/p1AbBiEp
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/bradfitz/gomemcache v0.0.0-20160117192205-fb1f79c6b65a/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60=
 github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60=
-github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ=
-github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4=
-github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
-github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY=
-github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
+github.com/bradfitz/gomemcache v0.0.0-20250403215159-8d39553ac7cf h1:TqhNAT4zKbTdLa62d2HDBFdvgSbIGB3eJE8HqhgiL9I=
+github.com/bradfitz/gomemcache v0.0.0-20250403215159-8d39553ac7cf/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c=
+github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=
+github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
+github.com/bytedance/sonic v1.14.2 h1:k1twIoe97C1DtYUo+fZQy865IuHia4PR5RPiuGPPIIE=
+github.com/bytedance/sonic v1.14.2/go.mod h1:T80iDELeHiHKSc0C9tubFygiuXoGzrkjKzX2quAx980=
+github.com/bytedance/sonic/loader v0.4.0 h1:olZ7lEqcxtZygCK9EKYKADnpQoYkRQxaeY2NYzevs+o=
+github.com/bytedance/sonic/loader v0.4.0/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo=
 github.com/casbin/casbin v1.7.0/go.mod h1:c67qKN6Oum3UF5Q1+BByfFxkwKvhwW57ITjqwtzR1KE=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/clbanning/mxj/v2 v2.5.5/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
@@ -65,9 +70,8 @@ github.com/clbanning/mxj/v2 v2.5.6/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn
 github.com/clbanning/mxj/v2 v2.7.0 h1:WA/La7UGCanFe5NpHF0Q3DNtnCsVoxbPKuyBNHWRyME=
 github.com/clbanning/mxj/v2 v2.7.0/go.mod h1:hNiWqW14h+kc+MdF9C6/YoRfjEJoR3ou6tn/Qo+ve2s=
 github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80=
-github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4=
-github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w=
-github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY=
+github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
+github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
 github.com/couchbase/go-couchbase v0.0.0-20200519150804-63f3cdb75e0d/go.mod h1:TWI8EKQMs5u5jLKW/tsb9VwauIrMIxQG1r5fMsswK5U=
 github.com/couchbase/gomemcached v0.0.0-20200526233749-ec430f949808/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c=
 github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
@@ -84,6 +88,8 @@ github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DP
 github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
 github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
 github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
+github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
+github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
 github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
@@ -93,6 +99,7 @@ github.com/gin-contrib/sessions v1.0.3 h1:AZ4j0AalLsGqdrKNbbrKcXx9OJZqViirvNGsJT
 github.com/gin-contrib/sessions v1.0.3/go.mod h1:5i4XMx4KPtQihnzxEqG9u1K446lO3G19jAi2GtbfsAI=
 github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E=
 github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0=
+github.com/gin-gonic/gin v1.1.4/go.mod h1:7cKuhb5qV2ggCFctp2fJQ+ErvciLZrIeoOSOm6mUr7Y=
 github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU=
 github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y=
 github.com/glendc/gopher-json v0.0.0-20170414221815-dc4743023d0c/go.mod h1:Gja1A+xZ9BoviGJNA2E9vFkPjjsl+CoJxSXiQM1UXtw=
@@ -119,6 +126,7 @@ github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PU
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
 github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
+github.com/golang/protobuf v0.0.0-20161117033126-8ee79997227b/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@@ -133,6 +141,8 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l
 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
 github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/gomodule/redigo v1.8.1/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
+github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
 github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
@@ -178,10 +188,8 @@ github.com/kamva/mgm/v3 v3.5.0/go.mod h1:F4J1hZnXQMkqL3DZgR7Z7BOuiTqQG/JTic3Yzli
 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
 github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
 github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
-github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
 github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE=
 github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
-github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -197,8 +205,10 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
 github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
 github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
 github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
+github.com/manucorporat/sse v0.0.0-20160126180136-ee05b128a739/go.mod h1:zUx1mhth20V3VKgL5jbd1BSQcW4Fy6Qs4PZvQwRFwzM=
 github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
 github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
+github.com/mattn/go-isatty v0.0.0-20161123143637-30a891c33c7c/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
 github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
@@ -251,6 +261,8 @@ github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644/go.mod h1:nkxAfR/
 github.com/siddontang/go v0.0.0-20170517070808-cb568a3e5cc0/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw=
 github.com/siddontang/goredis v0.0.0-20150324035039-760763f78400/go.mod h1:DDcKzU3qCuvj/tPnimWSsZZzvk9qvkvrIL5naVBPh5s=
 github.com/siddontang/rdb v0.0.0-20150307021120-fc89ed2e418d/go.mod h1:AMEsy7v5z92TR1JKMkLLoaOQk++LVnOKL3ScbJ8GNGA=
+github.com/silenceper/wechat v1.2.6 h1:FED3ko2yD96YD153xIV0I0bDjII4GxWaggjsYKdjQQc=
+github.com/silenceper/wechat v1.2.6/go.mod h1:7Wf0sCqQgJG65zCnl4TcDFk2XYxRCfqwQjg0Cf/lKeM=
 github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
 github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
@@ -262,6 +274,8 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
 github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/speps/go-hashids v2.0.0+incompatible h1:kSfxGfESueJKTx0mpER9Y/1XHl+FVQjtCqRyYcviFbw=
 github.com/speps/go-hashids v2.0.0+incompatible/go.mod h1:P7hqPzMdnZOfyIk+xrlG1QaSMw+gCBdHKsBDnhpaZvc=
+github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
+github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
 github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec/go.mod h1:QBvMkMya+gXctz3kmljlUCu/yB3GZ6oee+dUozsezQE=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -275,10 +289,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
-github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
 github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
 github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
 github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
@@ -344,6 +356,7 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20191125084936-ffdde1057850/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
@@ -455,6 +468,8 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
 gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
 gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
+gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE=
+gopkg.in/go-playground/validator.v8 v8.18.1/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y=
 gopkg.in/ini.v1 v1.56.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
 gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
 gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
@@ -471,4 +486,3 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50=

+ 1 - 1
main-admin/main.go

@@ -46,7 +46,7 @@ func main() {
 	mongo.Database(os.Getenv("MONGO_DSN"))
 
 	// 启动任务
-	task.Init().Run()
+	task.Run(task.VersionInit(), 5*time.Minute)
 
 	if os.Getenv("GIN_MODE") == "release" {
 		gin.SetMode(gin.ReleaseMode)

+ 13 - 0
main-subscribe/build.bat

@@ -0,0 +1,13 @@
+SET CGO_ENABLED=0
+SET GOOS=linux
+SET GOARCH=amd64
+echo now the CGO_ENABLED:
+ go env CGO_ENABLED
+
+echo now the GOOS:
+ go env GOOS
+
+echo now the GOARCH:
+ go env GOARCH
+
+go build -o ../bin/admin-server main.go

+ 73 - 0
main-subscribe/main.go

@@ -0,0 +1,73 @@
+package main
+
+import (
+	"dsbqj-admin/app/router"
+	"dsbqj-admin/app/task"
+	"dsbqj-admin/model/mongo"
+	"dsbqj-admin/pkg/cache"
+	"dsbqj-admin/pkg/logger"
+	"dsbqj-admin/pkg/validator"
+	"fmt"
+	"github.com/gin-gonic/gin"
+	"github.com/joho/godotenv"
+	"log"
+	"net/http"
+	"os"
+	"time"
+)
+
+// @title           后端接口API文档
+// @version         1.0
+// @description     这里展示所有当前web端API接口信息
+
+// @host      localhost:30101
+// @BasePath  /web/v1
+
+// @securityDefinitions.apikey cxy_token
+// @name __CXY_TOKEN_
+// @in header
+// @securityDefinitions.apikey cxy_uid
+// @name __CXY_UID_
+// @in header
+
+func main() {
+	err := godotenv.Load()
+	if err != nil {
+		log.Fatal("Error loading .env file")
+		return
+	}
+	validator.Init()
+	logger.Init("")
+	cache.Redis()
+
+	// 连接数据库
+	//mysql.Database(os.Getenv("MYSQL_DSN"))
+	mongo.Database(os.Getenv("MONGO_DSN"))
+
+	// 启动任务
+	//task.VersionInit().Exec()
+	task.Run(task.SubscribeInit(), 1*time.Second)
+
+	if os.Getenv("GIN_MODE") == "release" {
+		gin.SetMode(gin.ReleaseMode)
+	}
+
+	// 装载路由
+	routersInit := router.NewRouter()
+	readTimeout := 60 * time.Second
+	writeTimeout := 200 * time.Second
+	endPoint := fmt.Sprintf(":%s", os.Getenv("LISTEN_PORT"))
+	maxHeaderBytes := 1 << 20
+
+	server := &http.Server{
+		Addr:           endPoint,
+		Handler:        routersInit,
+		ReadTimeout:    readTimeout,
+		WriteTimeout:   writeTimeout,
+		MaxHeaderBytes: maxHeaderBytes,
+	}
+
+	logger.Info("start http version listening %s", endPoint)
+	err = server.ListenAndServe()
+	fmt.Println(err)
+}

+ 24 - 0
model/mongo/subscribe/subscribe.go

@@ -0,0 +1,24 @@
+package subscribe
+
+import (
+	"github.com/kamva/mgm/v3"
+)
+
+type ModuleSubscribe struct {
+	Enabled bool `bson:"enabled" json:"enabled"`
+
+	// 最近一次开启 / 关闭时间
+	OpenAt  int64 `bson:"open_at,omitempty"  json:"open_at,omitempty"`
+	CloseAt int64 `bson:"close_at,omitempty" json:"close_at,omitempty"`
+}
+
+type Subscribe struct {
+	mgm.DefaultModel `bson:",inline"`
+
+	// 用户标识
+	OpenId   string `bson:"open_id"   json:"open_id"`
+	DeviceId string `bson:"device_id" json:"device_id"`
+
+	// 订阅状态集合:key = module, value = 订阅信息
+	Modules map[string]*ModuleSubscribe `bson:"modules" json:"modules"`
+}

+ 39 - 0
pkg/cache/wechat/accessToken.go

@@ -0,0 +1,39 @@
+package wechat
+
+import (
+	"context"
+	"fmt"
+	"github.com/go-redis/redis"
+	"time"
+)
+
+const AccessTokenKey = "wechat:access_token:%s"
+
+type AccessToken struct {
+	Client redis.UniversalClient
+	ctx    context.Context
+}
+
+func NewAccessToken(client redis.UniversalClient) *AccessToken {
+	return &AccessToken{Client: client, ctx: context.Background()}
+}
+
+func (this *AccessToken) Get(key string) interface{} {
+	val := this.Client.Get(fmt.Sprintf(AccessTokenKey, key)).Val()
+	if val == "" {
+		return nil
+	}
+	return val
+}
+
+func (this *AccessToken) Set(key string, val interface{}, timeout time.Duration) error {
+	return this.Client.Set(fmt.Sprintf(AccessTokenKey, key), val, timeout).Err()
+}
+
+func (this *AccessToken) IsExist(key string) bool {
+	return this.Client.Exists(fmt.Sprintf(AccessTokenKey, key)).Val() == 1
+}
+
+func (this *AccessToken) Delete(key string) error {
+	return this.Client.Del(fmt.Sprintf(AccessTokenKey, key)).Err()
+}

+ 98 - 0
pkg/helper/wechat/wechatHelper.go

@@ -0,0 +1,98 @@
+package wechat
+
+import (
+	"context"
+	"dsbqj-admin/pkg/cache"
+	"dsbqj-admin/pkg/logger"
+	"dsbqj-admin/pkg/util"
+	"github.com/goccy/go-json"
+	"github.com/silenceper/wechat"
+	"os"
+	"sync/atomic"
+
+	wechatCache "dsbqj-admin/pkg/cache/wechat"
+)
+
+var GOpenid OpenidMgr
+
+type OpenidMgr struct {
+	V    atomic.Value //值
+	Over int64        //过期时间
+}
+
+type WechatHelper struct {
+	ctx       context.Context
+	Wechat    *wechat.Wechat
+	AppId     string
+	AppSecret string
+}
+
+const (
+	subscribeUrl = "https://api.weixin.qq.com/cgi-bin/message/subscribe/send?access_token="
+)
+
+func NewWechatHelper() *WechatHelper {
+	appID := os.Getenv("WECHAT_APPID")
+	appSecret := os.Getenv("WECHAT_SECERT")
+	wx := wechat.NewWechat(&wechat.Config{
+		AppID:     appID,
+		AppSecret: appSecret,
+		Cache:     wechatCache.NewAccessToken(cache.RedisReport),
+	})
+
+	return &WechatHelper{
+		ctx:       context.Background(),
+		AppId:     appID,
+		AppSecret: appSecret,
+		Wechat:    wx,
+	}
+}
+
+type errmsg struct {
+	Errcode int    `json:"errcode"` //错误码
+	Errmsg  string `json:"errmsg"`  //错误信息
+	Result  struct {
+		Suggest string `json:"suggest"` //建议,有risky、pass、review三种值
+		Label   int    `json:"label"`   //命中标签枚举值,100 正常;10001 广告;20001 时政;20002 色情;20003 辱骂;20006 违法犯罪;20008 欺诈;20012 低俗;20013 版权;21000 其他
+	} `json:"result"`
+}
+
+type Subscribe struct {
+	ToUser           string      `json:"touser"`
+	TemplateId       string      `json:"template_id"`
+	MiniProgramState string      `json:"miniprogram_state"`
+	Lang             string      `json:"lang"`
+	Data             interface{} `json:"data"`
+}
+
+func (this *WechatHelper) SendWechatSubscribe(openid string, template string, msg interface{}) int {
+	token, _ := this.Wechat.GetAccessToken()
+	url := subscribeUrl + token
+	reqData := Subscribe{ToUser: openid, TemplateId: template, MiniProgramState: "developer", Lang: "zh_CN"}
+	reqData.Data = msg
+
+	data, _ := json.Marshal(reqData)
+	buf, err := util.HTTPPost(url, string(data))
+	if err != nil {
+		logger.Info("[ERROR] CheckWechatMsg post err!err:", err.Error())
+		return 101
+	}
+	var re = new(errmsg)
+	err = json.Unmarshal(buf, re)
+	if err != nil {
+		logger.Info("[ERROR] CheckWechatMsg Unmarshal err!err:", err)
+		return 103
+	}
+	if re.Errcode != 0 {
+		if re.Errcode == 40003 || re.Errcode == 43101 { //openid无效,appid与openid不匹配, 用户未订阅消息(用户未在近两小时访问小程序)
+			return 0
+		}
+		logger.Info("CheckWechatMsg msg:", msg, " ret:", re.Errcode, " errmsg:", re.Errmsg)
+	} else {
+		if re.Result.Label != 100 && re.Result.Label != 0 {
+			logger.Info("[ERROR]CheckWechatMsg msg:", msg, " ret:", re.Result.Label, " ,suggest:", re.Result.Suggest)
+			return re.Result.Label
+		}
+	}
+	return re.Errcode
+}

+ 21 - 0
pkg/helper/wechat/wechatHelper_test.go

@@ -0,0 +1,21 @@
+package wechat
+
+import (
+	"dsbqj-admin/pkg/cache"
+	"fmt"
+	"testing"
+)
+
+func TestSubMsg(t *testing.T) {
+	cache.Redis()
+
+	helper := NewWechatHelper()
+
+	msg := make(map[string]map[string]string)
+	msg["thing12"] = make(map[string]string)
+	msg["thing12"]["value"] = "天气不错"
+	msg["thing13"] = make(map[string]string)
+	msg["thing13"]["value"] = "明天下雨"
+
+	fmt.Println(helper.SendWechatSubscribe("oAJoL5F_9UlGXNq27AaQ2BMGIt7s", "UeiRSO8hHAFvTodH3SlWb_KoqT3Z2F2MoSvsDkZv7AE", msg))
+}

+ 106 - 0
pkg/sender/subscribe.go

@@ -0,0 +1,106 @@
+package sender
+
+import (
+	"dsbqj-admin/model/mongo/subscribe"
+	"dsbqj-admin/pkg/helper/wechat"
+	"dsbqj-admin/pkg/logger"
+	"fmt"
+	"github.com/kamva/mgm/v3"
+	"go.mongodb.org/mongo-driver/bson"
+	"log"
+)
+
+type SubscribeSend struct {
+	DeviceId string
+	OpenIds  []string
+	Module   string
+}
+
+type SubscribeSender struct {
+	workerNum int
+	queue     chan *SubscribeSend
+	wxHelper  *wechat.WechatHelper
+}
+
+func NewSubscribeSender(workerNum int) *SubscribeSender {
+	return &SubscribeSender{
+		workerNum: workerNum,
+		queue:     make(chan *SubscribeSend, 1000),
+		wxHelper:  wechat.NewWechatHelper(),
+	}
+}
+
+func (this *SubscribeSender) Send(send *SubscribeSend) {
+	select {
+	case this.queue <- send:
+	default:
+		logger.Info("subscribe sender queue full, drop task=%+v", send)
+	}
+}
+
+func (this *SubscribeSender) Start() {
+	for i := 0; i < this.workerNum; i++ {
+		go this.worker(i)
+	}
+}
+
+func (this *SubscribeSender) worker(id int) {
+	for task := range this.queue {
+		this.safeSend(task)
+	}
+}
+
+func (this *SubscribeSender) safeSend(send *SubscribeSend) {
+	defer func() {
+		if r := recover(); r != nil {
+			log.Printf("send subscribe panic: %v", r)
+		}
+	}()
+
+	switch send.Module {
+	case "hangup":
+		this.SendHangupSubscribe(send.DeviceId)
+	case "autofight":
+		this.SendHangupSubscribe(send.DeviceId)
+	case "guildgame":
+		this.SendGuildGameSubscribe(send.OpenIds)
+	default:
+		log.Printf("[push] unknown type=%s, openId=%s",
+			send.Module, send.DeviceId)
+	}
+}
+
+func (this *SubscribeSender) SendHangupSubscribe(deviceId string) {
+	subscribeOne := new(subscribe.Subscribe)
+	err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
+	if err != nil {
+		return
+	}
+
+	msg := make(map[string]map[string]string)
+	msg["thing12"] = make(map[string]string)
+	msg["thing12"]["value"] = "天气不错"
+	msg["thing13"] = make(map[string]string)
+	msg["thing13"]["value"] = "明天下雨"
+
+	fmt.Println(this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, "UeiRSO8hHAFvTodH3SlWb_KoqT3Z2F2MoSvsDkZv7AE", msg))
+}
+
+func (this *SubscribeSender) SendAutoFightSubscribe(deviceId string) {
+	subscribeOne := new(subscribe.Subscribe)
+	err := mgm.Coll(&subscribe.Subscribe{}).First(bson.M{"device_id": deviceId, "modules.hangup.enabled": true}, subscribeOne)
+	if err != nil {
+		return
+	}
+	msg := make(map[string]map[string]string)
+	msg["thing12"] = make(map[string]string)
+	msg["thing12"]["value"] = "天气不错"
+	msg["thing13"] = make(map[string]string)
+	msg["thing13"]["value"] = "明天下雨"
+
+	fmt.Println(this.wxHelper.SendWechatSubscribe(subscribeOne.OpenId, "UeiRSO8hHAFvTodH3SlWb_KoqT3Z2F2MoSvsDkZv7AE", msg))
+}
+
+func (this *SubscribeSender) SendGuildGameSubscribe(openIds []string) {
+
+}

+ 274 - 0
pkg/util/http.go

@@ -0,0 +1,274 @@
+package util
+
+import (
+	"bytes"
+	"crypto/tls"
+	"encoding/pem"
+	"fmt"
+	"github.com/goccy/go-json"
+	"golang.org/x/crypto/pkcs12"
+	"io"
+	"io/ioutil"
+	"log"
+	"mime/multipart"
+	"net/http"
+	"os"
+	"time"
+)
+
+func HttpPostAuth(url string, data string, Auth string) (*http.Response, error) {
+	//sendbuf, _ := sonic.Marshal(data)
+	req, _ := http.NewRequest("POST", url, bytes.NewReader([]byte(data)))
+	req.Header.Set("Content-Type", "application/json")
+	req.Header.Set("Authorization", Auth)
+	client := &http.Client{
+		Timeout: 3 * time.Second,
+	}
+	return client.Do(req)
+}
+
+func HttpPostForm(url string, data string) (*http.Response, error) {
+	req, _ := http.NewRequest("POST", url, bytes.NewReader([]byte(data)))
+	req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+	client := &http.Client{
+		Timeout: 3 * time.Second,
+	}
+	return client.Do(req)
+
+}
+
+// HTTPGet get 请求
+func HTTPGet(uri string) ([]byte, error) {
+	response, err := http.Get(uri)
+	if err != nil {
+		return nil, err
+	}
+
+	defer response.Body.Close()
+	if response.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("http get error : uri=%v , statusCode=%v", uri, response.StatusCode)
+	}
+	return ioutil.ReadAll(response.Body)
+}
+
+// HTTPPost post 请求
+func HTTPPost(uri string, data string) ([]byte, error) {
+	body := bytes.NewBuffer([]byte(data))
+	response, err := http.Post(uri, "application/json", body)
+	if err != nil {
+		return nil, err
+	}
+
+	defer response.Body.Close()
+	if response.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("http get error : uri=%v , statusCode=%v", uri, response.StatusCode)
+	}
+	return ioutil.ReadAll(response.Body)
+}
+
+func HttpPostSaas(uri string, data string, times int) ([]byte, error) {
+	client := &http.Client{
+		Timeout: time.Second * 5, // 设置请求超时时间
+	}
+
+	fmt.Println("call remote url ", uri, " data ", data)
+	var count int
+	for {
+		body := bytes.NewBuffer([]byte(data))
+		// 创建一个新的请求
+		req, err := http.NewRequest("POST", uri, body)
+		req.Header.Set("Content-Type", "application/json")
+		req.Header.Set("Tenant", "tenant_sys_online")
+		if err != nil {
+			return nil, err
+		}
+		// 发送请求
+		resp, err := client.Do(req)
+		if err == nil {
+			defer resp.Body.Close()
+
+			if resp.StatusCode == http.StatusOK {
+				// 请求成功
+				return ioutil.ReadAll(resp.Body)
+			} else {
+				// 可以根据需要处理其他状态码
+				fmt.Printf("请求失败,状态码: %d\n", resp.StatusCode)
+			}
+		} else {
+			// 请求出错,记录错误信息
+			fmt.Println("请求错误:", err)
+		}
+
+		if count >= times {
+			break
+		}
+		count++
+
+		// 等待一段时间后重试
+		time.Sleep(time.Second * 1)
+		fmt.Printf("尝试重发 #%d...\n", count)
+	}
+	return nil, fmt.Errorf("达到最大重发次数,无法完成请求")
+
+}
+
+// PostJSON post json 数据请求
+func PostJSON(uri string, obj interface{}) ([]byte, error) {
+	jsonData, err := json.Marshal(obj)
+	if err != nil {
+		return nil, err
+	}
+	jsonData = bytes.Replace(jsonData, []byte("\\u003c"), []byte("<"), -1)
+	jsonData = bytes.Replace(jsonData, []byte("\\u003e"), []byte(">"), -1)
+	jsonData = bytes.Replace(jsonData, []byte("\\u0026"), []byte("&"), -1)
+	body := bytes.NewBuffer(jsonData)
+	response, err := http.Post(uri, "application/json;charset=utf-8", body)
+	if err != nil {
+		return nil, err
+	}
+	defer response.Body.Close()
+
+	if response.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("http get error : uri=%v , statusCode=%v", uri, response.StatusCode)
+	}
+	return ioutil.ReadAll(response.Body)
+}
+
+// PostJSONWithRespContentType post json数据请求,且返回数据类型
+func PostJSONWithRespContentType(uri string, obj interface{}) ([]byte, string, error) {
+	jsonData, err := json.Marshal(obj)
+	if err != nil {
+		return nil, "", err
+	}
+
+	jsonData = bytes.Replace(jsonData, []byte("\\u003c"), []byte("<"), -1)
+	jsonData = bytes.Replace(jsonData, []byte("\\u003e"), []byte(">"), -1)
+	jsonData = bytes.Replace(jsonData, []byte("\\u0026"), []byte("&"), -1)
+
+	body := bytes.NewBuffer(jsonData)
+	response, err := http.Post(uri, "application/json;charset=utf-8", body)
+	if err != nil {
+		return nil, "", err
+	}
+	defer response.Body.Close()
+
+	if response.StatusCode != http.StatusOK {
+		return nil, "", fmt.Errorf("http get error : uri=%v , statusCode=%v", uri, response.StatusCode)
+	}
+	responseData, err := ioutil.ReadAll(response.Body)
+	contentType := response.Header.Get("Content-Type")
+	return responseData, contentType, err
+}
+
+// PostFile 上传文件
+func PostFile(fieldname, filename, uri string) ([]byte, error) {
+	fields := []MultipartFormField{
+		{
+			IsFile:    true,
+			Fieldname: fieldname,
+			Filename:  filename,
+		},
+	}
+	return PostMultipartForm(fields, uri)
+}
+
+// MultipartFormField 保存文件或其他字段信息
+type MultipartFormField struct {
+	IsFile    bool
+	Fieldname string
+	Value     []byte
+	Filename  string
+}
+
+// PostMultipartForm 上传文件或其他多个字段
+func PostMultipartForm(fields []MultipartFormField, uri string) (respBody []byte, err error) {
+	bodyBuf := &bytes.Buffer{}
+	bodyWriter := multipart.NewWriter(bodyBuf)
+
+	for _, field := range fields {
+		if field.IsFile {
+			fileWriter, e := bodyWriter.CreateFormFile(field.Fieldname, field.Filename)
+			if e != nil {
+				err = fmt.Errorf("error writing to buffer , err=%v", e)
+				return
+			}
+
+			fh, e := os.Open(field.Filename)
+			if e != nil {
+				err = fmt.Errorf("error opening file , err=%v", e)
+				return
+			}
+			defer fh.Close()
+
+			if _, err = io.Copy(fileWriter, fh); err != nil {
+				return
+			}
+		} else {
+			partWriter, e := bodyWriter.CreateFormField(field.Fieldname)
+			if e != nil {
+				err = e
+				return
+			}
+			valueReader := bytes.NewReader(field.Value)
+			if _, err = io.Copy(partWriter, valueReader); err != nil {
+				return
+			}
+		}
+	}
+
+	contentType := bodyWriter.FormDataContentType()
+	bodyWriter.Close()
+
+	resp, e := http.Post(uri, contentType, bodyBuf)
+	if e != nil {
+		err = e
+		return
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		return nil, err
+	}
+	respBody, err = ioutil.ReadAll(resp.Body)
+	return
+}
+
+// httpWithTLS CA证书
+func httpWithTLS(rootCa, key string) (*http.Client, error) {
+	var client *http.Client
+	certData, err := ioutil.ReadFile(rootCa)
+	if err != nil {
+		return nil, fmt.Errorf("unable to find cert path=%s, error=%v", rootCa, err)
+	}
+	cert := pkcs12ToPem(certData, key)
+	config := &tls.Config{
+		Certificates: []tls.Certificate{cert},
+	}
+	tr := &http.Transport{
+		TLSClientConfig:    config,
+		DisableCompression: true,
+	}
+	client = &http.Client{Transport: tr}
+	return client, nil
+}
+
+// pkcs12ToPem 将Pkcs12转成Pem
+func pkcs12ToPem(p12 []byte, password string) tls.Certificate {
+	blocks, err := pkcs12.ToPEM(p12, password)
+	defer func() {
+		if x := recover(); x != nil {
+			log.Print(x)
+		}
+	}()
+	if err != nil {
+		panic(err)
+	}
+	var pemData []byte
+	for _, b := range blocks {
+		pemData = append(pemData, pem.EncodeToMemory(b)...)
+	}
+	cert, err := tls.X509KeyPair(pemData, pemData)
+	if err != nil {
+		panic(err)
+	}
+	return cert
+}