Files

206 lines
4.8 KiB
Go

package wsServer
import (
"context"
json2 "encoding/json"
"errors"
"log"
"net/http"
"time"
"go-socket/packages/Enums/WsEventType"
"go-socket/packages/cache"
"go-socket/packages/tokens"
"go-socket/packages/types"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"github.com/google/uuid"
)
func ServeWsConnection(responseWriter http.ResponseWriter, request *http.Request) {
connection, err := websocket.Accept(responseWriter, request, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil {
log.Printf("websocket accept error: %v", err)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var user = types.User{WsConn: connection}
var isAuthenticated bool
var ignoreCache bool = true
defer func() { closeConnection(&user, ignoreCache) }()
for {
var userMessage map[string]any
err = wsjson.Read(ctx, connection, &userMessage)
if err != nil {
log.Printf("read error: %v", err)
return
}
if len(userMessage) > 0 {
if isAuthenticated {
if !handleAuthenticatedMessage(&user, &userMessage) {
return
}
} else {
if !handleUnauthenticatedMessage(ctx, &user, &userMessage) {
ignoreCache = true
return
}
isAuthenticated = true
}
}
}
}
func WsSendMessageCloseIfTimeout(user *types.User, message any) {
if user.WsConn == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err := wsjson.Write(ctx, user.WsConn, message)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
closeConnection(user, false)
}
log.Printf("write error: %v", err)
}
}
func WsSendMessageToMultipleCloseIfTimeout(users *[]types.User, excludeId uuid.UUID, message any) {
json, err := json2.Marshal(message)
if err != nil {
log.Printf("json marshal error: %v", err)
json = []byte{}
}
for _, user := range *users {
if user.Id != excludeId {
WsSendMessageCloseIfTimeout(&user, json)
}
}
}
func WsSendEventMessageToPermittedChannelUsersCloseIfTimeout(excluded uuid.UUID, channel *types.HubChannel, needed types.CachedUserPermissions, message any) {
channel.Mu.RLock()
permitted := make([]uuid.UUID, 0, len(channel.UsersCachedPermissions))
for id, perm := range channel.UsersCachedPermissions {
if id != excluded && perm&needed == needed {
permitted = append(permitted, id)
}
}
channel.Mu.RUnlock()
for _, id := range permitted {
target, err := cache.GetUserById(id)
if err != nil {
continue
}
if target.WsConn == nil {
target.IncrementUnreadMessagesWithCap(channel.Id)
}
WsSendMessageCloseIfTimeout(target, message)
}
}
// WsSendEventMessageToHubUsersCloseIfTimeout auto fills hubId into message
func WsSendEventMessageToHubUsersCloseIfTimeout(excluded uuid.UUID, hub *types.Hub, message *types.WsHubSpecificHubEventMessage) {
message.HubId = hub.Id
hub.Mu.RLock()
hubUsers := make([]*types.HubUser, 0, len(hub.Users))
for _, hubUser := range hub.Users {
if hubUser.OriginalId != excluded {
hubUsers = append(hubUsers, hubUser)
}
}
hub.Mu.RUnlock()
for _, hubUser := range hubUsers {
target, err := cache.GetUserById(hubUser.OriginalId)
if err != nil {
continue
}
WsSendMessageCloseIfTimeout(target, message)
}
}
func sendToAllMessageCloseIfTimeout(message *map[string]any) {
cache.Mu.RLock()
users := make([]*types.User, 0, len(cache.Users))
for _, user := range cache.Users {
users = append(users, user)
}
cache.Mu.RUnlock()
for _, user := range users {
WsSendMessageCloseIfTimeout(user, message)
}
}
func handleAuthenticatedMessage(user *types.User, userMessage *map[string]any) bool {
WsSendMessageCloseIfTimeout(user, userMessage)
return true
}
func handleUnauthenticatedMessage(ctx context.Context, user *types.User, userMessage *map[string]any) bool {
token, ok := (*userMessage)["token"].(string)
response := types.WsEventMessage{Type: WsEventType.Authentication}
if !ok {
response.Event = types.WsAuthMessage{
Success: false,
Error: "no token in message",
}
WsSendMessageCloseIfTimeout(user, response)
return false
}
userId, err := tokens.TokenValidateGetId(token)
if err != nil {
response.Event = types.WsAuthMessage{
Success: false,
Error: "invalid token",
}
WsSendMessageCloseIfTimeout(user, response)
return false
}
userFromCache, err := cache.GetUserById(userId)
if err != nil {
response.Event = types.WsAuthMessage{
Success: false,
Error: "user not found",
}
WsSendMessageCloseIfTimeout(user, response)
return false
}
userFromCache.WsConn = user.WsConn
*user = *userFromCache
response.Event = types.WsAuthMessage{
Success: true,
Error: "",
}
WsSendMessageCloseIfTimeout(user, response)
return true
}
func closeConnection(user *types.User, ignoreCache bool) {
if !ignoreCache {
cache.DeleteUser(user.Id)
}
if user.WsConn != nil {
user.WsConn.CloseNow()
}
}