change db structure, add goroutine connection handling
This commit is contained in:
+38
-72
@@ -12,19 +12,18 @@ import (
|
||||
)
|
||||
|
||||
type wsServer struct {
|
||||
OnOpen func(ctx context.Context, conn *websocket.Conn)
|
||||
OnClose func(ctx context.Context, conn *websocket.Conn, err error)
|
||||
OnMessage func(ctx context.Context, conn *websocket.Conn, msg map[string]any)
|
||||
OnOpen func(c *Client)
|
||||
OnClose func(c *Client, err error)
|
||||
OnMessage func(c *Client, msg map[string]any)
|
||||
}
|
||||
|
||||
var (
|
||||
unauthenticatedConnections []*websocket.Conn
|
||||
authenticatedConnections []AuthConnection
|
||||
mu sync.Mutex
|
||||
clients []*Client
|
||||
mu sync.Mutex
|
||||
)
|
||||
|
||||
func (s *wsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||
func (s *wsServer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
|
||||
conn, err := websocket.Accept(responseWriter, request, &websocket.AcceptOptions{
|
||||
InsecureSkipVerify: true,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -33,11 +32,16 @@ func (s *wsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
defer conn.CloseNow()
|
||||
|
||||
client := &Client{conn: conn}
|
||||
mu.Lock()
|
||||
clients = append(clients, client)
|
||||
mu.Unlock()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
if s.OnOpen != nil {
|
||||
s.OnOpen(ctx, conn)
|
||||
s.OnOpen(client)
|
||||
}
|
||||
|
||||
var readErr error
|
||||
@@ -47,52 +51,29 @@ func (s *wsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
break
|
||||
}
|
||||
if s.OnMessage != nil {
|
||||
s.OnMessage(ctx, conn, msg)
|
||||
s.OnMessage(client, msg)
|
||||
}
|
||||
}
|
||||
|
||||
cancel() // cancel before OnClose so any in-flight queries are canceled first
|
||||
|
||||
if s.OnClose != nil {
|
||||
s.OnClose(ctx, conn, readErr)
|
||||
s.OnClose(client, readErr)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
for i, c := range clients {
|
||||
if c == client {
|
||||
clients[i] = clients[len(clients)-1]
|
||||
clients = clients[:len(clients)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
conn.Close(websocket.StatusNormalClosure, "done")
|
||||
}
|
||||
|
||||
func removeConnectionCache(conn *websocket.Conn) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if getConnectionDataIfAuth(conn) != nil {
|
||||
for i, c := range authenticatedConnections {
|
||||
if c.connection == conn {
|
||||
authenticatedConnections[i] = authenticatedConnections[len(authenticatedConnections)-1]
|
||||
authenticatedConnections = authenticatedConnections[:len(authenticatedConnections)-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for i, c := range unauthenticatedConnections {
|
||||
if c == conn {
|
||||
unauthenticatedConnections[i] = unauthenticatedConnections[len(unauthenticatedConnections)-1]
|
||||
unauthenticatedConnections = unauthenticatedConnections[:len(unauthenticatedConnections)-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getConnectionDataIfAuth(conn *websocket.Conn) *AuthConnection {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
for _, c := range authenticatedConnections {
|
||||
if c.connection == conn {
|
||||
return &c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func sendAndCloseIfFails(conn *websocket.Conn, message map[string]any) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
@@ -101,54 +82,39 @@ func sendAndCloseIfFails(conn *websocket.Conn, message map[string]any) {
|
||||
}
|
||||
}
|
||||
|
||||
func sendToAllExceptAndCloseIfFails(conn *websocket.Conn, message map[string]any) {
|
||||
_, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
for _, aConn := range authenticatedConnections {
|
||||
if aConn.connection != conn {
|
||||
sendAndCloseIfFails(aConn.connection, message)
|
||||
func sendToAllExceptAndCloseIfFails(client *Client, message map[string]any) {
|
||||
for _, other := range clients {
|
||||
if other != client {
|
||||
sendAndCloseIfFails(other.conn, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleUnauthenticatedMessage(conn *websocket.Conn, msg map[string]any) {
|
||||
func handleUnauthenticatedMessage(client *Client, msg map[string]any) {
|
||||
token := msg["token"].(string)
|
||||
user, err := GetUserFromToken(token)
|
||||
if err != nil {
|
||||
log.Println("invalid or expired token:", err)
|
||||
err := conn.Close(websocket.StatusPolicyViolation, "invalid token")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
client.conn.Close(websocket.StatusPolicyViolation, "invalid token")
|
||||
return
|
||||
}
|
||||
mu.Lock()
|
||||
authenticatedConnections = append(authenticatedConnections, AuthConnection{connection: conn, user: user})
|
||||
mu.Unlock()
|
||||
sendAndCloseIfFails(conn, map[string]any{
|
||||
client.User = &user
|
||||
sendAndCloseIfFails(client.conn, map[string]any{
|
||||
"authAs": user.Name,
|
||||
})
|
||||
log.Println("New User authenticated as: " + user.Name)
|
||||
}
|
||||
|
||||
func handleAuthenticatedMessage(conn *websocket.Conn, msg map[string]any) {
|
||||
func handleAuthenticatedMessage(client *Client, msg map[string]any) {
|
||||
message := msg["message"].(string)
|
||||
if message == "" {
|
||||
sendAndCloseIfFails(conn, map[string]any{
|
||||
sendAndCloseIfFails(client.conn, map[string]any{
|
||||
"error": "no message",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
auth := getConnectionDataIfAuth(conn)
|
||||
if auth == nil {
|
||||
sendAndCloseIfFails(conn, map[string]any{
|
||||
"error": "no auth",
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
sendToAllExceptAndCloseIfFails(conn, map[string]any{
|
||||
"username": auth.user.Name,
|
||||
sendToAllExceptAndCloseIfFails(client, map[string]any{
|
||||
"username": client.User.Name,
|
||||
"message": message,
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user