实时协作绘图(第二部分):服务器发送事件 + WebRTC 网状网络
在之前的文章中,我们使用 Canvas 和 WebRTC 创建了一个实时协作绘图工具。我们使用SimplePeer来实现这一目标,并使用WebSocket与信令服务器通信。
这种方法效果很好,但它通过添加一些不必要的抽象和复杂性,掩盖了底层实现。我们可以通过使用SSE来简化操作。此外,我们将直接使用RTCPeerConnection来更深入地了解 WebRTC 。
本文结束时,我们将得出以下结论:
- 随机生成的客厅 ID
- 服务器发送事件,用于我们的通信信令通道
- WebRTC 数据通道的全连接网状网络
利用 SSE 简化服务器通信
我们使用 WebSocket 是因为我们需要一种方法,让服务器在发生诸如offer、peer 加入以及任何其他信令通信时,都能触发客户端的回调。但遗憾的是,有一些特殊情况需要处理:
- 与服务器/客户端的连接状态丢失
- WebSocket 可能不受支持(负载均衡器默认情况下可能不支持)。
- 长轮询的备选方案
WebSocket 的这种复杂性使得我们通常使用像socket.io这样的工具来处理所有这些细节。但是,我们可以使用SSE来处理与服务器的通信,而 SSE 仅使用 HTTP 协议即可完成此操作。
通过使用SSE,我们可以获得以下好处:
- 基于HTTP的数据高效、易于理解的协议。
- 通过 HTTP/2 自动复用
- 只需一个连接即可
- 连接可以轻松迁移到不同的服务器。
- 无需复杂的负载均衡器配置,也无需担心代理或防火墙的问题。
房间和网状网络
我们的服务器功能主要只是传递消息,但我们希望实现更多功能。我们需要能够控制其他用户加入服务器的方式。我们还希望拥有一个可共享的房间 ID,以便其他人可以加入。现在让我们再来看看如何创建Express服务器。
我们首先需要做的就是将用户引导到一个专属的房间。这样可以确保页面加载后,我们就能获得一个专属的绘图界面;而其他人想要加入,只需分享这个链接即可。
var express = require('express');
var http = require('http');
var path = require('path');
const app = express();
app.use('/static', express.static(`${__dirname}/static`));
const server = http.createServer(app);
// starting index
app.locals.index = 100000000000;
app.get('/', (req, res) => {
app.locals.index++;
let id = app.locals.index.toString(36);
res.redirect(`/${id}`);
});
app.get('/:roomId', (req, res) => {
res.sendFile(path.join(__dirname, 'static/index.html'));
});
server.listen(process.env.PORT || 8081, () => {
console.log(`Started server on port ${server.address().port}`);
});
然后,在我们的静态目录中,我们有:
- /static/index.html
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Let's Draw Together</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/remixicon@2.5.0/fonts/remixicon.css">
<link rel="stylesheet" href="/static/index.css">
<link rel="alternate icon" type="image/png" href="/static/logo.png">
<link rel="icon" type="image/svg+xml" href="/static/logo.png">
</head>
<body>
<div class="flush vstack">
<div class="menubar hstack">
<a class="icon-link center">
<i class="ri-lg ri-landscape-line"></i>
</a>
<div class="spacer"></div>
<a class="icon-link active center">
<i class="ri-lg ri-pencil-fill"></i>
</a>
<div class="spacer"></div>
<a class="icon-link center">
<i class="ri-lg ri-palette-line"></i>
<i class="ri-lg ri-checkbox-blank-fill"></i>
</a>
<div class="spacer"></div>
</div>
<div class="spacer app">
<canvas></canvas>
</div>
</div>
</body>
</html>
- /static/index.css
:root {
--root-font-size: 12px;
--standard-padding: 16px;
--bg: #fafafa;
--fg: #666;
--menubar-bg: #fdfdfd;
--active-color: #339999;
--menubar-shadow: 0 8px 6px -6px #f4f4f4;
}
/** Reset */
html, body, nav, ul, h1, h2, h3, h4, a, canvas {
margin: 0px;
padding: 0px;
color: var(--fg);
}
html, body {
font-family: Roboto, -apple-system, BlinkMacSystemFont, 'Segoe UI', Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
font-size: var(--root-font-size);
background: var(--bg);
height: 100%;
width: 100%;
overflow: hidden;
}
*, body, button, input, select, textarea, canvas {
text-rendering: optimizeLegibility;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
outline: 0;
}
/** Utilities */
.hstack {
display: flex;
flex-direction: row;
}
.vstack {
display: flex;
flex-direction: column;
}
.center {
display: flex;
align-items: center;
}
.spacer {
flex: 1;
}
.flush {
height: 100%;
}
.icon-link {
padding: calc(var(--standard-padding) / 2);
margin: calc(var(--standard-padding) * -1) 0px;
font-size: 1rem;
position: relative;
border-bottom: 2px solid transparent;
top: 2px;
cursor: pointer;
}
.icon-link:hover {
color: var(--active-color);
}
.icon-link.active {
color: var(--active-color);
border-bottom: 2px solid var(--active-color);
}
/** Sections */
.menubar {
padding: var(--standard-padding);
box-shadow: var(--menubar-shadow);
background: var(--menubar-bg);
}
.app {
width: 100%;
}
连接到服务器事件流
从 HTTP 的角度来看,服务器发送事件流 (Server Sent Event Stream) 与一个永远无法完成的下载过程并无太大区别。我们只是想建立一个与服务器的连接,并将此连接建立为一个客户端,以便在应用程序的任何位置都可以写入数据。接下来,我们添加这部分代码:
// store the connections from clients here
var clients = {};
function disconnected(client) {
let index = app.locals.clients.indexOf(client);
if (index > -1) {
app.locals.clients.splice(index, 1);
}
}
app.get('/connect', (req, res) => {
if (req.headers.accept !== 'text/event-stream') {
return res.sendStatus(404);
}
// write the event stream headers
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader("Access-Control-Allow-Origin", "*");
res.flushHeaders();
// setup a client
let client = {
id: uuid.v4(),
emit: (event, data) => {
res.write(`id: ${uuid.v4()}`);
res.write(`event: ${event}`);
res.write(`data: ${JSON.stringify(data)}\n\n`);
}
};
clients[client.id] = client;
// emit the connected state
client.emit('connected', { user: req.user });
req.on('close', () => {
disconnected(client);
});
});
在上述实现中,我们所做的只是在客户端保持响应连接,以便响应其他消息并在客户端之间传递信息。为此,我们只需将响应头写入响应中,text/event-stream之后的所有写入操作都可以采用下面描述的简单格式。
服务器发送事件格式
事件流是一个简单的文本数据流,必须使用 UTF-8 编码。事件流中的消息以一对换行符分隔。以冒号开头的行本质上是一个注释,会被忽略。
每条消息由一行或多行文本组成,列出该消息的各个字段。每个字段由字段名称、冒号以及该字段值的文本数据表示。
服务器发送事件包含 4 个可用字段(每行一个字段),字段之间用冒号分隔。这些字段包括:
- 事件
用于标识所描述事件类型的字符串。如果指定了此字符串,则会在浏览器中向指定事件名称的监听器分发事件;网站源代码应使用 `addEventListener()` 来监听命名事件。如果未为消息指定事件名称,则会调用 `onmessage` 处理程序。
- 数据
消息的数据字段。当 EventSource 收到多行以“data:”开头的连续数据时,它会将这些数据连接起来,并在每行之间插入一个换行符。末尾的换行符将被删除。
- ID
要设置 EventSource 对象最后一个事件 ID 值的事件 ID
- 重试
尝试发送事件时使用的重连时间。此值必须为整数,以毫秒为单位指定重连时间。如果指定了非整数值,则该字段将被忽略。
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}
event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}
event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}
JWT(Json Web Tokens)
我们需要一种快速的方法来识别网站上哪些请求属于哪个用户。为此,我们将使用JWT。它能快速确保我们找到的是正确的用户,并且后续的对等请求都能正确关联到该用户。
首先,请确保将其添加到package.json 文件中作为依赖项。此时您应该已经安装了express.env 。此外,我们将创建一个文件来配置TOKEN_SECRET环境变量。为此,我们将使用dotenv。
npm install --save express jsonwebtoken dotenv
.env我创建了一个,使用TOKEN_SECRET的方法如下(您可以使用任何您喜欢的方法,以下方法只是为了简单起见):
require('crypto').randomBytes(64).toString('hex')
然后将结果粘贴到.env文件中
TOKEN_SECRET=569e3cd22e2ff68ef02688c2100204cd29d7ad2520971ad9eea6db1c2be576a666734a4531787448811001a76d63fd5394e1fc8f7083bab7793abead60ba1392
接下来,添加以下代码,以确保我们可以生成令牌并对传入的请求进行身份验证。
var jwt = require('jwt');
var dotenv = require('dotenv');
dotenv.config();
function auth(req, res, next) {
let token;
if (req.headers.authorization) {
token = req.headers.authorization.split(' ')[1];
} else if (req.query.token) {
token = req.query.token;
}
if (typeof token !== 'string') {
return res.sendStatus(401);
}
jwt.verify(token, process.env.TOKEN_SECRET, (err, user) => {
if (err) {
return res.sendStatus(403);
}
req.user = user;
next();
});
}
app.post('/access', (req, res) => {
if (!req.body.username) {
return res.sendStatus(403);
}
const user = {
id: uuid.v4(),
username: req.body.username
};
const token = jwt.sign(user, process.env.TOKEN_SECRET, { expiresIn: '3600s' });
return res.json(token);
});
现在我们有了生成身份验证令牌的方法。在更实际的场景中,我们可能会尝试让这种身份验证方法根据登录用户生成唯一的令牌。不过,目前它只针对匿名用户。我们还有一个身份验证方法来验证传入的令牌。接下来,让我们更新 ` /connect`函数,使其使用本地令牌req.user,并确保令牌能够通过身份验证函数的验证。
app.get('/connect', auth, (req,res) => {
if (req.headers.accept !== 'text/event-stream') {
return res.sendStatus(404);
}
// write the event stream headers
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader("Access-Control-Allow-Origin", "*");
res.flushHeaders();
// setup a client
let client = {
id: req.user.id,
user: req.user,
emit: (event, data) => {
res.write(`id: ${uuid.v4()}`);
res.write(`event: ${event}`);
res.write(`data: ${JSON.stringify(data)}`);
}
};
clients[client.id] = client;
req.on('close', () => {
disconnected(client);
});
});
现在所有对等ID都将与授权令牌生成的用户ID一致。每当用户实际加入下面的房间时,我们都会使用此ID。
加入房间、传递消息和断开连接
就服务器而言,在这个应用程序中我们主要关注以下 3 个主要功能。
当用户想要加入房间时
当我们加入一个房间时,我们需要能够通知房间内的所有现有客户端,有一位新成员加入了房间。此外,当前关联的客户端连接需要与所有这些现有客户端通信,通过生成一个 offer来建立成员连接。
var channels = {};
app.post('/:roomId/join', auth, (req, res) => {
let roomId = req.params.roomId;
if (channels[roomId] && channels[roomId][req.user.id]) {
return res.sendStatus(200);
}
if (!channels[roomId]) {
channels[roomId] = {};
}
for (let peerId in channel) {
if (clients[peerId] && clients[req.user.id]) {
clients[peerId].emit('add-peer', { peer: req.user, roomId, offer: false });
clients[req.user.id].emit('add-peer', { peer: clients[peerId].user, roomId, offer: true });
}
}
channels[roomId][req.user.id] = true;
return res.sendStatus(200);
});
当用户需要向其他用户转发消息时
当建立点对点连接时,WebRTC 必须能够传递SDP消息,例如WebRTC 会话、WebRTC Offer和WebRTC Answers。
这些中继信息需要通过信令服务器传递。我们将把这些消息简单地传递给用户请求发送消息的目标对等节点(或多个对等节点)。
app.post('/relay/:peerId/:event', auth, (req, res) => {
let peerId = req.params.peerId;
if (clients[peerId]) {
clients[peerId].emit(req.params.event, { peer: req.user, data: req.body });
}
return res.sendStatus(200);
});
当用户与服务器完全断开连接时
最后,当用户与服务器断开连接时,我们需要清理该用户曾经加入过的频道。为此,我们将更新该disconnected函数。
function disconnected(client) {
delete clients[client.id];
for (let roomId in channels) {
let channel = channels[roomId];
if (channel[client.id]) {
for (let peerId in channel) {
channel[peerId].emit('remove-peer', { peer: client.user, roomId });
}
delete channel[client.id];
}
if (Object.keys(channel).length === 0) {
delete channels[roomId];
}
}
}
设置客户端连接
现在我们已经有了能够正确处理客户端通信的服务器,接下来让我们编写WebRTC库来执行所有这些通信。在上一篇文章中,我们使用了simplepeer,但在本文中,我们将直接使用 WebRTC API。这将帮助我们更好地理解其工作原理,以及如何手动执行一些通信操作。
在此之前,我们需要设置一些事项,例如获取/access 令牌,以及设置EventSource以向我们发送消息流。
将以下内容添加到index.html文件的底部。
<script type="text/javascript" src="/static/load.js"></script>
然后,在新建文件/static/load.js中,我们需要添加以下代码来设置事件流和访问令牌。
var context = {
username: 'user' + parseInt(Math.random() * 100000),
roomId: window.location.pathname.substr(1),
token: null,
eventSource: null
};
async function getToken() {
let res = await fetch('/access', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
username: context.username
})
});
let data = await res.json();
context.token = data.token;
}
async function join() {
return fetch(`/${context.roomId}/join`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${context.token}`
}
});
}
async function connect() {
await getToken();
context.eventSource = new EventSource(`/connect?token=${context.token}`);
context.eventSource.addEventListener('add-peer', addPeer, false);
context.eventSource.addEventListener('remove-peer', removePeer, false);
context.eventSource.addEventListener('session-description', sessionDescription, false);
context.eventSource.addEventListener('ice-candidate', iceCandidate, false);
context.eventSource.addEventListener('connected', () => {
join();
});
}
function addPeer(data) {}
function removePeer(data) {}
function sessionDescription(data) {}
function iceCandidate(data) {}
connect();
这就是我们开始之前所需的大部分通信内容!在上面的代码中,我们使用fetch API发送请求,通过在上下文中提供用户名来获取访问令牌。
设置好事件源后,我们可以调用 `fetch` 方法,join该方法会使用 fetch API 发送POST请求,表明我们想要加入当前房间。如果您还记得,`fetch` 方法/:roomId/join会遍历给定频道中的所有客户端,并add-peer使用新加入的user.id调用 `fetch` 方法,同时也会向该客户端发送add-peer包含新加入用户offer: trueID 的数据。
WebRTC - 设置网状网络
WebRTC 使用多个协议和 API 构建而成,这些协议和 API 协同工作,实现了在浏览器之间捕获和传输音频/媒体/数据而无需中间程序的功能。
想要全面了解 WebRTC 的工作原理,我建议阅读《WebRTC for the curious》,这本书实际上是由 WebRTC 的一些作者撰写的。
在 WebRTC 中,我们尤其需要建立RTCPeerConnection 连接,以便与其他网络成员通信。每当我们收到add-peer消息时,都会建立对等连接。
const rtcConfig = {
iceServers: [{
urls: [
'stun:stun.l.google.com:19302',
'stun:global.stun.twilio.com:3478'
]
}]
};
function addPeer(data) {
let message = JSON.parse(data.data);
if (context.peers[message.peer.id]) {
return;
}
// setup peer connection
let peer = new RTCPeerConnection(rtcConfig);
context.peers[message.peer.id] = peer;
// handle ice candidate
peer.onicecandidate = function (event) {
if (event.candidate) {
relay(message.peer.id, 'ice-candidate', event.candidate);
}
};
// generate offer if required (on join, this peer will create an offer
// to every other peer in the network, thus forming a mesh)
if (message.offer) {
// create the data channel, map peer updates
let channel = peer.createDataChannel('updates');
channel.onmessage = function (event) {
onPeerData(message.peer.id, event.data);
};
context.channels[message.peer.id] = channel;
createOffer(message.peer.id, peer);
} else {
peer.ondatachannel = function (event) {
context.channels[message.peer.id] = event.channel;
event.channel.onmessage = function (evt) {
onPeerData(message.peer.id, evt.data);
};
};
}
}
function broadcast(data) {
for (let peerId in context.channels) {
context.channels[peerId].send(data);
}
}
async function relay(peerId, event, data) {
await fetch(`/relay/${peerId}/${event}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${context.token}`
},
body: JSON.stringify(data)
});
}
async function createOffer(peerId, peer) {
let offer = await peer.createOffer();
await peer.setLocalDescription(offer);
await relay(peerId, 'session-description', offer);
}
这段代码执行几个步骤,首先我们需要创建一个RTCPeerConnection 连接。我们会传递一些默认的 ICE/STUN 服务器,以确保协议的 ICE/STUN 部分在通过信令服务器(我们的 Express 应用)传递数据时能够正常工作。接下来,当由于协商阶段而调用onicecandidate时,它会将该信息传递给对端。
每当发生这种情况时,我们都会创建数据通道并订阅消息。我们只希望在需要发起协商的这一部分时才创建新的数据通道并生成报价。否则,我们将直接监听 ondatachannel事件。
remove-peer当对等节点被移除、发起候选或会话信息时,必须处理ice-candidate这些问题。我们需要处理这些问题并创建答案,以便远程对等节点可以正确设置或。session-descriptionice-candidatesession-descriptionlocalDescriptionremoteDescription
async function sessionDescription(data) {
let message = JSON.parse(data.data);
let peer = context.peers[message.peer.id];
let remoteDescription = new RTCSessionDescription(message.data);
await peer.setRemoteDescription(remoteDescription);
if (remoteDescription.type === 'offer') {
let answer = await peer.createAnswer();
await peer.setLocalDescription(answer);
await relay(message.peer.id, 'session-description', answer);
}
}
function iceCandidate(data) {
let message = JSON.parse(data.data);
let peer = context.peers[message.peer.id];
peer.addIceCandidate(new RTCIceCandidate(message.data));
}
function removePeer(data) {
let message = JSON.parse(data.data);
if (context.peers[message.peer.id]) {
context.peers[message.peer.id].close();
}
delete context.peers[message.peer.id];
}
请注意,在该session-description函数中,我们会根据提供的信息设置remoteDescription ,并在设置localDescription并将该信息传递出去之前,先生成对offer 的响应(如果有 offer)。两者都以SDP的形式提供信息。offeranswer
太棒了!🎉 乍一看可能不太明显,但我们刚刚创建了一个使用 WebRTC 数据通道通过 UDP 进行数据通信的系统!如果您启动服务器node .并在两个不同的浏览器窗口中加载相同的房间 ID,您应该能够检查context.channels.
实时协作绘图
让我们把上一篇文章中的代码复制过来,创建一个名为/static/draw.js的文件。
const canvas = document.querySelector('canvas');
const ctx = canvas.getContext('2d');
var lastPoint;
var force;
function randomColor() {
let r = Math.random() * 255;
let g = Math.random() * 255;
let b = Math.random() * 255;
return `rgb(${r}, ${g}, ${b})`;
}
var color = randomColor();
var colorPicker = document.querySelector('[data-color]');
colorPicker.dataset.color = color;
colorPicker.style.color = color;
function resize() {
canvas.width = window.innerWidth;
canvas.height = window.innerHeight;
}
function onPeerData(id, data) {
draw(JSON.parse(data));
}
function draw(data) {
ctx.beginPath();
ctx.moveTo(data.lastPoint.x, data.lastPoint.y);
ctx.lineTo(data.x, data.y);
ctx.strokeStyle = data.color;
ctx.lineWidth = Math.pow(data.force || 1, 4) * 2;
ctx.lineCap = 'round';
ctx.stroke();
ctx.closePath();
}
function move(e) {
if (e.buttons) {
if (!lastPoint) {
lastPoint = { x: e.offsetX, y: e.offsetY };
return;
}
draw({
lastPoint,
x: e.offsetX,
y: e.offsetY,
force: force,
color: color
});
broadcast(JSON.stringify({
lastPoint,
x: e.offsetX,
y: e.offsetY,
force: force,
color: color
}));
lastPoint = { x: e.offsetX, y: e.offsetY };
}
}
function up() {
lastPoint = undefined;
}
function key(e) {
if (e.key === 'Backspace') {
ctx.clearRect(0, 0, canvas.width, canvas.height);
}
}
function forceChanged(e) {
force = e.webkitForce || 1;
}
window.onresize = resize;
window.onmousemove = move;
window.onmouseup = up;
window.onkeydown = key;
window.onwebkitmouseforcechanged = forceChanged;
resize();
请注意,我们使用 `<div>` 标签onPeerData(id, data)将信息绘制到屏幕上,并使用`/static/load.js`文件中的`broadcast` 函数将当前绘制的信息广播到房间。至此,我们便拥有了一个功能齐全的P2P 网状网络,该网络使用服务器发送事件 (Server Sent Events)作为实时信令服务器。
结论
呼!本教程的内容可能比预想的要多一些。我们学习了服务器发送事件(Server Sent Events),实现了信令服务器并直接使用了 WebRTC 库,甚至还添加了对具有唯一房间 ID 的网状网络对等连接的支持。现在底层通信基本搞定了,接下来需要对绘图工具的功能和工具进行一些升级。
在下一篇文章中,我们将添加一些除画笔之外的工具,并学习一些关于状态同步的知识。目前,我们的绘图状态只是简单地通过执行接收到的每个操作来覆盖——但没有任何机制告诉我们页面加载时绘图表面的状态是什么。我们将探讨如何利用 CRDT 来解决这类分布式问题。
在后续文章中,我们将重新审视该架构,并使用Redis添加一个PubSub服务器。添加 PubSub 服务器将使我们能够创建一个负载均衡器,并利用我们的连接部署任意数量的信令服务器(从而帮助我们扩展)。
干杯!🍻
如果你喜欢这篇文章,请点赞并关注我!也欢迎关注我的推特。
再次感谢!🏕
代码
如果你对本系列代码感兴趣,请查看下方我在 GitHub 上的代码库:
https://github.com/nyxtom/drawing-webrtc
再次感谢!
文章来源:https://dev.to/nyxtom/realtime-collaborative-drawing-part-2-server-sent-events-webrtc-mesh-networks-jnf


