代码拉取完成,页面将自动刷新
package socket_server
import (
"bytes"
"context"
"fmt"
"github.com/ZR233/goutils/stackx"
"net"
"sync"
"time"
)
type Session struct {
id uint32
server *Server
net.Conn
cmdChan chan CmdInterface
ctxCmd context.Context
cancelCmd context.CancelFunc
ctxRead context.Context
cancelRead context.CancelFunc
readBuff []byte
Fields interface{}
closeOnce sync.Once
}
func (s *Session) close(err error) {
s.closeOnce.Do(func() {
s.server.options.Handler.OnClose(s, err)
s.Fields = nil
s.server.deleteSession(s.id)
s.cancelCmd()
s.cancelRead()
_ = s.Conn.Close()
close(s.cmdChan)
})
return
}
func (s *Session) Close() (err error) {
s.close(nil)
return nil
}
func (s *Session) CloseWithError(err error) error {
s.close(err)
return nil
}
func (s *Session) NewCmdClose(err error) *CmdClose {
return &CmdClose{
session: s,
err: err,
}
}
func newSession(id uint32, server *Server, conn net.Conn) (s *Session) {
s = &Session{
id: id,
server: server,
Conn: conn,
cmdChan: make(chan CmdInterface, 10),
}
s.readBuff = make([]byte, s.server.options.BuffLen)
s.ctxCmd, s.cancelCmd = context.WithCancel(s.server.ctxCmd)
s.ctxRead, s.cancelRead = context.WithCancel(s.server.ctxCmd)
s.server.options.Handler.OnConnect(s)
go s.runReadLoop()
go s.runCmdLoop()
return
}
func (s *Session) runReadLoop() {
for {
if s.ctxRead.Err() != nil {
return
}
s.readLoop()
}
}
func (s *Session) runCmdLoop() {
for {
select {
case <-s.ctxCmd.Done():
return
case cmd, ok := <-s.cmdChan:
if ok {
cmd.Exec()
}
}
}
}
func (s *Session) readLoop() {
defer func() {
if p := recover(); p != nil {
err := fmt.Errorf("%s\n%s", p, string(stackx.Stack(0)))
s.handleError(err)
}
}()
handler := s.server.options.Handler
headerData, err := s.read(handler.HeaderLen())
if err != nil {
return
}
thisRecv := context.Background()
bodyLen, thisRecvUpdated, err := handler.HeaderHandler(headerData, s, thisRecv)
if err != nil {
panic(err)
}
bodyData, err := s.read(bodyLen)
if err != nil {
return
}
err = handler.BodyHandler(bodyData, s, thisRecvUpdated)
if err != nil {
panic(err)
}
return
}
func (s *Session) read(wantLen int) (data []byte, err error) {
deadline := time.Now().Add(s.server.options.ReadTimeout)
if s.server.options.ReadTimeout == 0 {
deadline = time.Time{}
}
err = s.SetReadDeadline(deadline)
if err != nil {
s.close(err)
return
}
buf := bytes.NewBuffer(data)
n := 0
for {
if buf.Len() == wantLen {
break
}
buffLen := wantLen - buf.Len()
if buffLen > s.server.options.BuffLen {
buffLen = s.server.options.BuffLen
}
dataBatch := s.readBuff[:buffLen]
n, err = s.Read(dataBatch)
if err != nil {
s.close(err)
return
}
buf.Write(dataBatch[:n])
}
return buf.Bytes(), nil
}
func (s *Session) handleError(err error) {
if s.ctxCmd.Err() == nil {
s.server.handleError(err, s)
}
}
func (s *Session) NewCommandWrite(data []byte) *CmdWrite {
return &CmdWrite{
session: s,
data: data,
}
}
func (s *Session) ExecCommands(commands ...CmdInterface) {
defer func() {
recover()
}()
for _, command := range commands {
s.cmdChan <- command
}
}
func (s *Session) Id() uint32 {
return s.id
}
func (s *Session) Stopped() bool {
return s.ctxCmd.Err() != nil
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。