Django REST Framework - WebSocket
本文将构建一个使用 JWT 身份验证并通过 Django REST Framework 与 WebSocket 通信的应用程序。本文主要关注如何从外部消费者向 WebSocket 发送数据。
开始之前,您可以在这里找到源代码。
我们对同一个端点有两个请求:GET 和 POST。每当有请求到达端点时,我们都会向 websocket 发送消息。
这就是我们models.py的消息模型,如下所示,非常简单。
from django.db import models
from django.contrib.auth import get_user_model
User = get_user_model()
class Message(models.Model):
message = models.JSONField()
user = models.ForeignKey(to=User, on_delete=models.CASCADE, null=True, blank=True)
配置
我们需要一个routing.py包含相关应用程序网址的文件。
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'msg/', consumers.ChatConsumer.as_asgi()),
]
然后我们需要routing.py在项目的核心目录中添加另一个文件。
import os
from websocket.middlewares import WebSocketJWTAuthMiddleware
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from websocket import routing
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_ws.settings")
application = ProtocolTypeRouter(
{
"http": get_asgi_application(),
"websocket": WebSocketJWTAuthMiddleware(URLRouter(routing.websocket_urlpatterns)),
}
)
如您所见,这里使用了中间件。该中间件的作用是控制身份验证过程。如果您点击此链接,您会发现 Django 支持标准的 Django 身份验证,但由于我们使用的是 JWT 身份验证,因此需要一个自定义中间件。本项目使用了REST Framework SimpleJWT,在创建连接时,我们会发送带有查询字符串的令牌,这种方式安全性不高。我们将用户分配到的作用域如下所示。middlewares.py
from urllib.parse import parse_qs
from channels.db import database_sync_to_async
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.tokens import AccessToken, TokenError
User = get_user_model()
@database_sync_to_async
def get_user(user_id):
try:
return User.objects.get(id=user_id)
except User.DoesNotExist:
return AnonymousUser()
class WebSocketJWTAuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
parsed_query_string = parse_qs(scope["query_string"])
token = parsed_query_string.get(b"token")[0].decode("utf-8")
try:
access_token = AccessToken(token)
scope["user"] = await get_user(access_token["user_id"])
except TokenError:
scope["user"] = AnonymousUser()
return await self.app(scope, receive, send)
最后是 settings.py 文件。我们使用 Redis 作为通道层,因此需要启动一个 Redis 服务器,这可以通过 Docker 来实现。
docker run -p 6379:6379 -d redis:5
settings.py
ASGI_APPLICATION = 'django_ws.routing.application'
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [('0.0.0.0', 6379)],
},
},
}
消费者
这是我们的consumer.py文件,由于 ChatConsumer 在需要访问数据库时是异步的,因此相关方法需要一个database_sync_to_async装饰器。
websocket/consumer.py
import json
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from django.contrib.auth.models import AnonymousUser
from websocket.models import Message
class ChatConsumer(AsyncWebsocketConsumer):
groups = ["general"]
async def connect(self):
await self.accept()
if self.scope["user"] is not AnonymousUser:
self.user_id = self.scope["user"].id
await self.channel_layer.group_add(f"{self.user_id}-message", self.channel_name)
async def send_info_to_user_group(self, event):
message = event["text"]
await self.send(text_data=json.dumps(message))
async def send_last_message(self, event):
last_msg = await self.get_last_message(self.user_id)
last_msg["status"] = event["text"]
await self.send(text_data=json.dumps(last_msg))
@database_sync_to_async
def get_last_message(self, user_id):
message = Message.objects.filter(user_id=user_id).last()
return message.message
浏览量
正如我在上一步提到的,我们的消费者是异步的,我们需要将方法从异步转换为同步,就像函数名称一样。
views.py
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from .models import Message
class MessageSendAPIView(APIView):
permission_classes = (IsAuthenticated,)
def get(self, request):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"general", {"type": "send_info_to_user_group",
"text": {"status": "done"}}
)
return Response({"status": True}, status=status.HTTP_200_OK)
def post(self, request):
msg = Message.objects.create(user=request.user, message={
"message": request.data["message"]})
socket_message = f"Message with id {msg.id} was created!"
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"{request.user.id}-message", {"type": "send_last_message",
"text": socket_message}
)
return Response({"status": True}, status=status.HTTP_201_CREATED)
在此视图的 GET 请求中,我们将消息发送到频道的“general”组,因此该组中的每个人都会收到该消息。如果您检查一下,consumers.py就会发现我们的默认组是“general”。另一方面,在 POST 请求中,我们将消息发送到指定的组。换句话说,此消息仅发送到与用户 ID 相关的组,这意味着只有该用户才会收到该消息。
目前看来一切正常,但只有实际测试才能确定是否真的有效,所以我们来试试。我使用的是 Chrome扩展程序来连接 WebSocket。
GET 请求的结果
POST 请求
注意:Python 3.10.0 与 asyncio 存在兼容性问题,请勿使用此版本。
文章来源:https://dev.to/buurak/django-rest-framework-websocket-3pb6

