Rust 实战丨SSE(Server-Sent Events)

Rust 实战丨SSE(Server-Sent Events)

码农世界 2024-06-17 后端 92 次浏览 0个评论

📌 SSE(Server-Sent Events)是一种允许服务器向客户端浏览器推送信息的技术。它是 HTML5 的一部分,专门用于建立一个单向的从服务器到客户端的通信连接。SSE的使用场景非常广泛,包括实时消息推送、实时通知更新等。

SSE 的本质

严格地说,HTTP 无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。

也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。

特点

  1. 持续连接:与传统的 HTTP 请求不同,SSE 保持连接开放,服务器可以随时发送消息。
  2. 文本数据流:SSE 主要传输文本数据,这些数据以特定的格式流式传输,使得每条消息都是简单的文本格式。
  3. 内置重连机制:浏览器会自动处理连接中断和重连,包括在重连请求中发送最后接收的事件 ID,以便服务器从正确的位置恢复发送事件。
  4. 简单的客户端处理:在浏览器中,使用 JavaScript 的 EventSource 接口处理 SSE 非常简单,只需几行代码即可监听服务器发来的事件。

工作原理

  1. 建立连接:客户端通过创建一个 EventSource 对象请求特定的 URL 来启动 SSE 连接。这个请求是一个标准的 HTTP 请求,但会要求服务器以特定方式响应。
  2. 服务器响应:服务器响应必须设置 Content-Type 为 text/event-stream,然后保持连接打开。
  3. 发送消息:服务器可以通过持续发送数据格式为特定事件流的消息来推送更新。每个消息包括一个可选的事件类型、数据和一个可选的 ID。
    • 数据:实际的消息内容,以 data: 开头,多行数据以双换行符 \n\n 结束。
    • 事件类型:允许客户端根据事件类型来监听,以 event: 开头。
    • ID:如果连接中断,客户端将发送包含上次接收的最后一个ID的 Last-Event-ID 头,以便服务器从断点继续发送数据。

实战

客户端




    SSE Test


Server-Sent Events Test

Rust 服务端

Rust 实战丨SSE(Server-Sent Events)

依赖:

anyhow = "1.0.86"
axum = { version = "0.7.5" }
chrono = "0.4.38"
futures-core = "0.3.30"
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", ] }
tokio-stream = "0.1.15"
tower-http = { version = "0.5.2", features = ["cors"] }

代码:

use std::time::Duration;
use axum::{
    response::{sse::Event, Sse},
    routing::get,
    Router,
};
use tokio::{net::TcpListener, time::interval};
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tower_http::cors::{Any, CorsLayer};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let cors = CorsLayer::new()
        .allow_headers(Any)
        .allow_origin(Any)
        .allow_headers(Any)
        .allow_credentials(false);
    let listener = TcpListener::bind("0.0.0.0:8000").await?;
    let app = Router::new().route("/events", get(sse_handler)).layer(cors);
    axum::serve(listener, app).await?;
    Ok(())
}
async fn sse_handler() -> Sse>> {
    let interval = interval(Duration::from_secs(1));
    let stream = IntervalStream::new(interval).map(|_| {
        let data = format!("{}\n\n", chrono::Local::now().to_rfc2822());
        Ok(Event::default().data(data))
    });
    Sse::new(stream)
}

Go 服务端

Rust 实战丨SSE(Server-Sent Events)

package main
import (
	"fmt"
	"log"
	"net/http"
	"time"
)
func sseHandler(w http.ResponseWriter, r *http.Request) {
	// 设置头部信息,确保允许跨域,并且告诉浏览器这是一个事件流
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Access-Control-Allow-Origin", "*")
	// 不断发送消息
	for {
		// 生成服务器时间,并发送给客户端
		now := time.Now()
		// 生成消息,格式为 data: {content} \n\n
		msg := fmt.Sprintf("data: %s\n\n", now.Format(time.DateTime))
		// 发送消息
		if _, err := fmt.Fprintf(w, msg); err != nil {
			log.Println("write error:", err)
			break
		}
		// 刷新响应缓冲,确保即时发送
		flusher, ok := w.(http.Flusher)
		if !ok {
			log.Println("Streaming unsupported!")
			break
		}
		flusher.Flush()
		// 每秒发送一次
		time.Sleep(1 * time.Second)
	}
}
func main() {
	http.HandleFunc("/events", sseHandler)
	log.Println("Server started on port 8000...")
	log.Fatal(http.ListenAndServe(":8000", nil))
}

转载请注明来自码农世界,本文标题:《Rust 实战丨SSE(Server-Sent Events)》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,92人围观)参与讨论

还没有评论,来说两句吧...

Top