千家信息网

构建一个即时消息应用之什么是实时消息

发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,这篇文章主要讲解了"构建一个即时消息应用之什么是实时消息",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"构建一个即时消息应用之什么是实时消息"吧!消息户
千家信息网最后更新 2025年01月19日构建一个即时消息应用之什么是实时消息

这篇文章主要讲解了"构建一个即时消息应用之什么是实时消息",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"构建一个即时消息应用之什么是实时消息"吧!

消息户端

在 HTTP 部分之前,让我们先编写一个映射map ,让所有客户端都监听消息。 像这样全局初始化:

type MessageClient struct {     Messages chan Message     UserID   string }  var messageClients sync.Map

已创建的新消息

还记得在 上一篇文章 中,当我们创建这条消息时,我们留下了一个 "TODO" 注释。在那里,我们将使用这个函数来调度一个 goroutine。

go messageCreated(message)

把这行代码插入到我们留注释的位置。

func messageCreated(message Message) error {     if err := db.QueryRow(`         SELECT user_id FROM participants         WHERE user_id != $1 and conversation_id = $2     `, message.UserID, message.ConversationID).     Scan(&message.ReceiverID); err != nil {         return err     }      go broadcastMessage(message)      return nil }  func broadcastMessage(message Message) {     messageClients.Range(func(key, _ interface{}) bool {         client := key.(*MessageClient)         if client.UserID == message.ReceiverID {             client.Messages <- message         }         return true     }) }

该函数查询接收者 ID(其他参与者 ID),并将消息发送给所有客户端。

订阅消息

让我们转到 main() 函数并添加以下路由:

router.HandleFunc("GET", "/api/messages", guard(subscribeToMessages))

此端点处理 /api/messages 上的 GET 请求。请求应该是一个 EventSource 连接。它用一个事件流响应,其中的数据是 JSON 格式的。

func subscribeToMessages(w http.ResponseWriter, r *http.Request) {     if a := r.Header.Get("Accept"); !strings.Contains(a, "text/event-stream") {         http.Error(w, "This endpoint requires an EventSource connection", http.StatusNotAcceptable)         return     }      f, ok := w.(http.Flusher)     if !ok {         respondError(w, errors.New("streaming unsupported"))         return     }      ctx := r.Context()     authUserID := ctx.Value(keyAuthUserID).(string)      h := w.Header()     h.Set("Cache-Control", "no-cache")     h.Set("Connection", "keep-alive")     h.Set("Content-Type", "text/event-stream")      messages := make(chan Message)     defer close(messages)      client := &MessageClient{Messages: messages, UserID: authUserID}     messageClients.Store(client, nil)     defer messageClients.Delete(client)      for {         select {         case <-ctx.Done():             return         case message := <-messages:             if b, err := json.Marshal(message); err != nil {                 log.Printf("could not marshall message: %v\n", err)                 fmt.Fprintf(w, "event: error\ndata: %v\n\n", err)             } else {                 fmt.Fprintf(w, "data: %s\n\n", b)             }             f.Flush()         }     } }

首先,它检查请求头是否正确,并检查服务器是否支持流式传输。我们创建一个消息通道,用它来构建一个客户端,并将其存储在客户端映射中。每当创建新消息时,它都会进入这个通道,因此我们可以通过 for-select 循环从中读取。

服务器发送事件Server-Sent Events使用以下格式发送数据:

data: some data here\n\n

我们以 JSON 格式发送:

data: {"foo":"bar"}\n\n

我们使用 fmt.Fprintf() 以这种格式写入响应写入器writter,并在循环的每次迭代中刷新数据。

这个循环会一直运行,直到使用请求上下文关闭连接为止。我们延迟了通道的关闭和客户端的删除,因此,当循环结束时,通道将被关闭,客户端不会收到更多的消息。

注意,服务器发送事件Server-Sent Events(EventSource)的 JavaScript API 不支持设置自定义请求头?,所以我们不能设置 Authorization: Bearer 。这就是为什么 guard() 中间件也会从 URL 查询字符串中读取令牌的原因。

感谢各位的阅读,以上就是"构建一个即时消息应用之什么是实时消息"的内容了,经过本文的学习后,相信大家对构建一个即时消息应用之什么是实时消息这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0