发布于 2026-01-06 8 阅读
0

Django REST Framework - WebSocket

Django REST Framework - WebSocket

本文将构建一个使用 JWT 身份验证并通过 Django REST Framework 与 WebSocket 通信的应用程序。本文主要关注如何从外部消费者向 WebSocket 发送数据。

开始之前,您可以在这里找到源代码。

我们对同一个端点有两个请求:GET 和 POST。每当有请求到达端点时,我们都会向 websocket 发送消息。

这就是我们models.py的消息模型,如下所示,非常简单。



from django.db import models
from django.contrib.auth import get_user_model

User = get_user_model()

class Message(models.Model):
    message = models.JSONField()
    user = models.ForeignKey(to=User, on_delete=models.CASCADE, null=True, blank=True)



Enter fullscreen mode Exit fullscreen mode

配置

我们需要一个routing.py包含相关应用程序网址的文件。



from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
    re_path(r'msg/', consumers.ChatConsumer.as_asgi()),
]



Enter fullscreen mode Exit fullscreen mode

然后我们需要routing.py在项目的核心目录中添加另一个文件。



import os

from websocket.middlewares import WebSocketJWTAuthMiddleware
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application

from websocket import routing

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "django_ws.settings")


application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": WebSocketJWTAuthMiddleware(URLRouter(routing.websocket_urlpatterns)),
    }
)



Enter fullscreen mode Exit fullscreen mode

如您所见,这里使用了中间件。该中间件的作用是控制身份验证过程。如果您点击此链接,您会发现 Django 支持标准的 Django 身份验证,但由于我们使用的是 JWT 身份验证,因此需要一个自定义中间件。本项目使用了REST Framework SimpleJWT,在创建连接时,我们会发送带有查询字符串的令牌,这种方式安全性不高。我们将用户分配到的作用域如下所示。
middlewares.py



from urllib.parse import parse_qs

from channels.db import database_sync_to_async
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.tokens import AccessToken, TokenError

User = get_user_model()


@database_sync_to_async
def get_user(user_id):
    try:
        return User.objects.get(id=user_id)
    except User.DoesNotExist:
        return AnonymousUser()


class WebSocketJWTAuthMiddleware:

    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        parsed_query_string = parse_qs(scope["query_string"])
        token = parsed_query_string.get(b"token")[0].decode("utf-8")

        try:
            access_token = AccessToken(token)
            scope["user"] = await get_user(access_token["user_id"])
        except TokenError:
            scope["user"] = AnonymousUser()

        return await self.app(scope, receive, send)


Enter fullscreen mode Exit fullscreen mode

最后是 settings.py 文件。我们使用 Redis 作为通道层,因此需要启动一个 Redis 服务器,这可以通过 Docker 来实现。

docker run -p 6379:6379 -d redis:5

settings.py



ASGI_APPLICATION = 'django_ws.routing.application'

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [('0.0.0.0', 6379)],
        },
    },
}


Enter fullscreen mode Exit fullscreen mode

消费者

这是我们的consumer.py文件,由于 ChatConsumer 在需要访问数据库时是异步的,因此相关方法需要一个database_sync_to_async装饰器。

websocket/consumer.py



import json

from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from django.contrib.auth.models import AnonymousUser

from websocket.models import Message


class ChatConsumer(AsyncWebsocketConsumer):
    groups = ["general"]

    async def connect(self):
        await self.accept()
        if self.scope["user"] is not AnonymousUser:
            self.user_id = self.scope["user"].id
            await self.channel_layer.group_add(f"{self.user_id}-message", self.channel_name)

    async def send_info_to_user_group(self, event):
        message = event["text"]
        await self.send(text_data=json.dumps(message))

    async def send_last_message(self, event):
        last_msg = await self.get_last_message(self.user_id)
        last_msg["status"] = event["text"]
        await self.send(text_data=json.dumps(last_msg))

    @database_sync_to_async
    def get_last_message(self, user_id):
        message = Message.objects.filter(user_id=user_id).last()
        return message.message



Enter fullscreen mode Exit fullscreen mode

浏览量

正如我在上一步提到的,我们的消费者是异步的,我们需要将方法从异步转换为同步,就像函数名称一样。

views.py



from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView

from .models import Message


class MessageSendAPIView(APIView):
    permission_classes = (IsAuthenticated,)

    def get(self, request):
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            "general", {"type": "send_info_to_user_group",
                        "text": {"status": "done"}}
        )

        return Response({"status": True}, status=status.HTTP_200_OK)

    def post(self, request):
        msg = Message.objects.create(user=request.user, message={
                                     "message": request.data["message"]})
        socket_message = f"Message with id {msg.id} was created!"
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            f"{request.user.id}-message", {"type": "send_last_message",
                                           "text": socket_message}
        )

        return Response({"status": True}, status=status.HTTP_201_CREATED)



Enter fullscreen mode Exit fullscreen mode

在此视图的 GET 请求中,我们将消息发送到频道的“general”组,因此该组中的每个人都会收到该消息。如果您检查一下,consumers.py就会发现我们的默认组是“general”。另一方面,在 POST 请求中,我们将消息发送到指定的组。换句话说,此消息仅发送到与用户 ID 相关的组,这意味着只有该用户才会收到该消息。

目前看来一切正常,但只有实际测试才能确定是否真的有效,所以我们来试试。我使用的是 Chrome扩展程序来连接 WebSocket。

GET 请求的结果

GET 请求的结果

POST 请求

POST 请求

注意:Python 3.10.0 与 asyncio 存在兼容性问题,请勿使用此版本。

文章来源:https://dev.to/buurak/django-rest-framework-websocket-3pb6