您现在的位置是: 网站首页 >Django Django
Django使用Channels实现WebSocket消息通知功能
admin2019年7月25日 17:52 【Django | JQuery | Nginx | Redis 】 3174人已围观
# Django Channels https://channels.readthedocs.io/en/latest/installation.html Channels改变Django在下面和通过Django的同步核心编织异步代码,允许Django项目不仅处理HTTP,还需要处理需要长时间连接的协议 - WebSockets,MQTT,chatbots,业余无线电等等。 它在保留Django同步和易用性的同时实现了这一点,允许您选择编写代码的方式 - 以Django视图,完全异步或两者混合的方式同步。除此之外,它还提供了与Django的auth系统,会话系统等的集成,使您可以比以往更轻松地将仅HTTP项目扩展到其他协议。 > 需求:消息实时**推送消息以及通知**功能,采用django-channels来实现websocket进行实时通讯。并使用supervisor启动daphne,保持websocket后台运行 # 应用安装 Django版本为2.2.3 ## pip安装channels ```bash >pip install -U channels ``` Windows安装报错解决 ``` copying src\twisted\words\xish\xpathparser.g -> build\lib.win-amd64-3.7\twisted\words\xish running build_ext building 'twisted.test.raiser' extension error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/ ---------------------------------------- ERROR: Failed building wheel for twisted Running setup.py clean for twisted Failed to build twisted Installing collected packages: twisted, daphne, channels Running setup.py install for twisted ... error ``` 解决方法,访问 https://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted 下载 .whl 格式文件手动安装。 ![下载文件](_v_images/20190723131927979_25263.png) 安装 ```bash >pip install StarMeow_Backup/Twisted-19.2.1-cp37-cp37m-win_amd64.whl # 继续安装 >pip install -U channels > ``` 下载太慢加上`-i https://pypi.douban.com/simple/` ## 添加到安装应用 将Channels库添加到已安装的应用程序列表中。编辑 settings.py 文件,并将`channels`添加到`INSTALLED_APPS`设置中。 ```python INSTALLED_APPS = [ # ... 'channels', # 【channels】(第1步)pip install -U channels 安装 # ... ] ``` ### 因pywin32造成重装系统的杯具 **Windows开发环境中** 将`channels`添加到应用中后,WTF!!!报错,run不起来 ```bash Watching for file changes with StatReloader Exception in thread django-main-thread: Traceback (most recent call last): File "D:\Apps\Python\Python37\lib\threading.py", line 926, in _bootstrap_inner self.run() File "D:\Apps\Python\Python37\lib\threading.py", line 870, in run self._target(*self._args, **self._kwargs) File "D:\Apps\Python\Python37venv\StarMeowTest\lib\site-packages\django\utils\autoreload.py", line 54, in wrapper fn(*args, **kwargs) File "D:\Apps\Python\Python37venv\StarMeowTest\lib\site-packages\django\core\management\commands\runserver.py", line 109, in inner_run autoreload.raise_last_exception() File "D:\Apps\Python\Python37venv\StarMeowTest\lib\site-packages\django\utils\autoreload.py", line 77, in raise_last_exception raise _exception[1] File "D:\Apps\Python\Python37venv\StarMeowTest\lib\site-packages\django\core\management\__init__.py", line 337, in execute autoreload.check_errors(django.setup)() File "D:\Apps\Python\Python37venv\StarMeowTest\lib\site-packages\django\utils\autoreload.py", line 54, in wrapper fn(*args, **kwargs) File "D:\Apps\Python\Python37venv\StarMeowTest\lib\site-packages\django\__init__.py", line 24, in setup apps.populate(settings.INSTALLED_APPS) File "D:\Apps\Python\Python37venv\StarMeowTest\lib\site-packages\django\apps\registry.py", line 83, in populate raise RuntimeError("populate() isn't reentrant") RuntimeError: populate() isn't reentrant ``` 这种情况看`NSTALLED_APPS`关联的app的相关包是否已安装完成。 把`channels`注释掉后服务器能正常启动。 好吧,重装系统了,结果还是不行,虽然是因为资源管理器一打开CPU就占用100%重装的。 两台电脑有一台能正常系统,对比虚拟环境安装的包,各种升降版本,卸装应用包测试终于找到问题所在。 ```bash >pip install pywin32 Collecting pywin32 Using cached https://files.pythonhosted.org/packages/a3/8a/eada1e7990202cd27e58eca2a278c344fef190759bbdc8f8f0eb6abeca9c/pywin32 -224-cp37-cp37m-win_amd64.whl Installing collected packages: pywin32 Successfully installed pywin32-224 ``` 重新运行服务器,正常了! ```bash Performing system checks... System check identified no issues (0 silenced). July 25, 2019 - 11:59:17 Django version 2.2.3, using settings 'StarMeow.settings' Starting ASGI/Channels version 2.2.0 development server at http://0.0.0.0:80/ Quit the server with CTRL-BREAK. ``` # 逻辑代码 ## 创建默认路由(主WS路由) Channels路由配置类似于Django URLconf,因为当通道服务器接收到HTTP请求时,它告诉通道运行什么代码。 将从一个空路由配置开始。创建一个文件 StarMeow/routing.py ,并包含以下代码: ```python # 【channels】(第2步)设置默认路由在项目创建routing.py文件 from channels.routing import ProtocolTypeRouter application = ProtocolTypeRouter({ # Empty for now (http->django views is added by default) }) ``` ## 设置执行路由对象(指定routing) 最后,将`ASGI_APPLICATION`设置为指向路由对象作为根应用程序,修改 settings.py 文件,添加: ```python # 【channels】(第3步)设置为指向路由对象作为根应用程序 ASGI_APPLICATION = "StarMeow.routing.application" ``` 就是这样!一旦启用,通道就会将自己集成到Django中,并控制runserver命令。 ## 启动channel layer:后端redis 信道层是一种通信系统。它允许多个消费者实例彼此交谈,以及与Django的其他部分交谈。 通道层提供以下抽象: 通道是一个可以将邮件发送到的邮箱。每个频道都有一个名称。任何拥有频道名称的人都可以向频道发送消息。 一组是一组相关的通道。一个组有一个名称。任何具有组名称的人都可以按名称向组添加/删除频道,并向组中的所有频道发送消息。无法枚举特定组中的通道。 每个使用者实例都有一个自动生成的唯一通道名,因此可以通过通道层进行通信。 在我们的聊天应用程序中,我们希望同一个房间中的多个聊天消费者实例相互通信。为此,我们将让每个聊天消费者将其频道添加到一个组,该组的名称基于房间名称。这将允许聊天用户向同一房间内的所有其他聊天用户发送消息。 我们将使用一个使用redis作为后备存储的通道层。要在端口6379上启动Redis服务器,首先系统上安装redis,并启动。 ### pip安装channels_redis ```bash >pip install channels_redis ``` ### 配置CHANNEL_LAYERS 修改 settings.py 增加配置 ```python # 【channels】后端 CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": ["redis://:password@127.0.0.1:6379/0"], }, }, } ``` 确保channel layer可以与Redis通信。打开Django shell并运行以下命令: ```bash >>> import channels.layers >>> channel_layer = channels.layers.get_channel_layer() >>> from asgiref.sync import async_to_sync >>> async_to_sync(channel_layer.send)('test_channel', {'type': 'hello'}) >>> async_to_sync(channel_layer.receive)('test_channel') {'type': 'hello'} ``` ### channels_redis 不同情况参考配置 ```python CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [("localhost", 6379)], }, }, } CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": ["redis://127.0.0.1:6379/8"], }, }, } CHANNEL_LAYERS = { 'default': { 'BACKEND': 'channels_redis.core.RedisChannelLayer', 'CONFIG': { "hosts": [('127.0.0.1', 6379)], }, }, } CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": ["redis://:password@127.0.0.1:6379/0"], "symmetric_encryption_keys": [SECRET_KEY], }, }, } ``` ## 应用下创建 consumers.py(类似Django视图) 使用异步方式 同步消费者很方便,因为他们可以调用常规的同步I / O函数,例如那些在不编写特殊代码的情况下访问Django模型的函数。 但是,异步使用者可以提供更高级别的性能,因为他们在处理请求时不需要创建其他线程。 ChatConsumer仅使用异步本机库(通道和通道层),特别是它不访问同步Django模型。 因此,它可以被重写为异步而不会出现复杂情况。 apps/pxectrl/consumers.py ```python # 【channels】(第4步)创建应用的消费者 from channels.generic.websocket import WebsocketConsumer, AsyncWebsocketConsumer from asgiref.sync import async_to_sync from channels.layers import get_channel_layer import json class AsyncConsumer(AsyncWebsocketConsumer): async def connect(self): # 连接时触发 self.room_name = self.scope['url_route']['kwargs']['room_name'] self.room_group_name = 'notice_%s' % self.room_name # 直接从用户指定的房间名称构造Channels组名称,不进行任何引用或转义。 # 将新的连接加入到群组 await self.channel_layer.group_add( self.room_group_name, self.channel_name ) await self.accept() async def disconnect(self, close_code): # 断开时触发 # 将关闭的连接从群组中移除 await self.channel_layer.group_discard( self.room_group_name, self.channel_name ) # Receive message from WebSocket async def receive(self, text_data=None, bytes_data=None): # 接收消息时触发 text_data_json = json.loads(text_data) message = text_data_json['message'] # 信息群发 await self.channel_layer.group_send( self.room_group_name, { 'type': 'system_message', 'message': message } ) # Receive message from room group async def system_message(self, event): print(event) message = event['message'] # Send message to WebSocket单发消息 await self.send(text_data=json.dumps({ 'message': message })) # 同步方式,仅作示例,不使用 class SyncConsumer(WebsocketConsumer): def connect(self): # 从打开到使用者的WebSocket连接的chat/routing.py中的URL路由中获取'room_name'参数。 self.room_name = self.scope['url_route']['kwargs']['room_name'] print('WebSocket建立连接:', self.room_name) # 直接从用户指定的房间名称构造通道组名称 self.room_group_name = 'msg_%s' % self.room_name # 加入房间 async_to_sync(self.channel_layer.group_add)( self.room_group_name, self.channel_name ) # async_to_sync(…)包装器是必需的,因为ChatConsumer是同步WebsocketConsumer,但它调用的是异步通道层方法。(所有通道层方法都是异步的。) # 接受WebSocket连接。 self.accept() simple_username = self.scope["session"]["session_simple_nick_name"] # 获取session中的值 async_to_sync(self.channel_layer.group_send)( self.room_group_name, { 'type': 'chat_message', 'message': '@{} 已加入房间'.format(simple_username) } ) def disconnect(self, close_code): print('WebSocket关闭连接') # 离开房间 async_to_sync(self.channel_layer.group_discard)( self.room_group_name, self.channel_name ) # 从WebSocket中接收消息 def receive(self, text_data=None, bytes_data=None): print('WebSocket接收消息:', text_data) text_data_json = json.loads(text_data) message = text_data_json['message'] # 发送消息到房间 async_to_sync(self.channel_layer.group_send)( self.room_group_name, { 'type': 'chat_message', 'message': message } ) # 从房间中接收消息 def chat_message(self, event): message = event['message'] # 发送消息到WebSocket self.send(text_data=json.dumps({ 'message': message })) ``` - 使用异步时继承自`AsyncWebsocketConsumer`而不是`WebsocketConsumer`。 - 所有方法都是`async def`而不是`def`。 - `await`用于调用执行I / O的异步函数。 - 在通道层上调用方法时不再需要`async_to_sync`。 ## 应用下创建 routing.py (类似Django路由) apps/pxectrl/routing.py ```python # 【channels】(第5步)为应用程序创建一个路由配置,该应用程序具有到消费者的路由 from django.conf.urls import url from pxectrl import consumers websocket_urlpatterns = [ # url(r'^ws/msg/(?P<room_name>[^/]+)/$', consumers.SyncConsumer), url(r'^ws/notice/(?P<room_name>[^/]+)/$', consumers.AsyncConsumer), ] ``` ## 修改项目下 routing.py (主WS路由) StarMeow/routing.py ```python # 【channels】(第2步)设置默认路由在项目创建routing.py文件 from channels.routing import ProtocolTypeRouter, URLRouter from channels.auth import AuthMiddlewareStack from channels.sessions import SessionMiddlewareStack import pxectrl.routing application = ProtocolTypeRouter({ # (http->django views is added by default) # 【channels】(第6步)添加路由配置指向应用的路由模块 'websocket': SessionMiddlewareStack( # 使用Session中间件,可以请求中session的值 URLRouter( pxectrl.routing.websocket_urlpatterns ) ), }) ``` 添加Channels子路由的配置 ## 外部消息发送到Channels 在 apps/pxectrl/consumers.py 创建一个函数,用于外部调用 ```python def send_group_msg(room_name, message): # 从Channels的外部发送消息给Channel """ from pxectrl import consumers consumers.send_group_msg('ITNest', {'content': '这台机器硬盘故障了', 'level': 1}) consumers.send_group_msg('ITNest', {'content': '正在安装系统', 'level': 2}) :param room_name: :param message: :return: """ channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( 'notice_{}'.format(room_name), # 构造Channels组名称 { "type": "system_message", "message": message, } ) ``` ## 前端页面连接WebSocket ```html <link href="{% static 'hadmin/css/plugins/toastr/toastr.min.css' %}" rel="stylesheet"> <script src="{% static 'hadmin/js/jquery.min.js' %}"></script> <script src="{% static 'hadmin/js/plugins/toastr/toastr.min.js' %}"></script> <script> let ws_scheme = window.location.protocol === "https:" ? "wss" : "ws"; let ws = new WebSocket(ws_scheme + '://' + window.location.host + '/ws/msg/ITNest/'); ws.onopen = function () { console.log('WebSocket建立连接'); }; ws.onmessage = function (e) { console.log('WebSocket接收消息:'); let data = JSON.parse(e.data); console.log(data); let message = data['message']; if (message.level === 1) { toastr.options = { // toastr配置 "closeButton": true, "debug": false, "progressBar": true, "positionClass": "toast-top-center", "showDuration": "400", "hideDuration": "1000", "timeOut": "30000", "extendedTimeOut": "1000", "showEasing": "swing", "hideEasing": "linear", "showMethod": "fadeIn", "hideMethod": "fadeOut" }; toastr.error(message.content, '警告'); } else { toastr.options = { // toastr配置 "closeButton": true, "debug": false, "progressBar": true, "positionClass": "toast-top-right", "showDuration": "400", "hideDuration": "1000", "timeOut": "7000", "extendedTimeOut": "1000", "showEasing": "swing", "hideEasing": "linear", "showMethod": "fadeIn", "hideMethod": "fadeOut" }; toastr.info(message.content, '信息') } }; ws.onclose = function (e) { console.error('WebSocket关闭连接'); }; </script> ``` ## 在shell中测试 ```bash >>> from pxectrl import consumers >>> consumers.send_group_msg('ITNest', {'content': '正在安装系统', 'level': 2}) >>> consumers.send_group_msg('ITNest', {'content': '这台电脑硬盘故障了', 'level': 1}) ``` ![BLOG_20190725_175527_63](/media/blog/images/2019/07/BLOG_20190725_175527_63.png "博客图集BLOG_20190725_175527_63.png") ![BLOG_20190725_175530_75](/media/blog/images/2019/07/BLOG_20190725_175530_75.png "博客图集BLOG_20190725_175530_75.png") 在后端,就可以通过调用该函数实现通知功能。 ## 服务器配置 ### 项目下添加 asgi.py 文件 ```python # 项目/settings和wsgi.py的同目录下创建asgi.py """ ASGI入口点,运行Django,然后运行在settings.py ASGI_APPLICATION 中定义的应用程序 安装:pip install daphne 运行:daphne -p 8001 ITNest.asgi:application """ import os import django from channels.routing import get_default_application os.environ.setdefault("DJANGO_SETTINGS_MODULE", "ITNest.settings") django.setup() application = get_default_application() ``` ### pip安装daphne ```bash pip install daphne ``` 运行 ```bash daphne -b 0.0.0.0 -p 8001 ITNest.asgi:application ``` 这里需要自行指定`-p`和`-b`参数 ### 修改nginx配置 ```nginx server { # 。。。 location /ws/ { proxy_pass http://127.0.0.1:8001; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_redirect off; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Host $server_name; } } ``` ### supervisor启动daphne ```bash pip install supervisor ``` 生成配置文件 ```bash (ITNest) root@PxeCtrlSys:/home/user/ITNest# mkdir Supervisor (ITNest) root@PxeCtrlSys:/home/user/ITNest# cd Supervisor/ (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# echo_supervisord_conf > supervisord.conf (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# ls supervisord.conf (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# cat supervisord.conf ``` 添加启动配置 ```ini (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# vim daphne.ini # 加入下面的内容 [program:daphne] directory=/home/user/ITNest command=daphne -b 127.0.0.1 -p 8001 --proxy-headers ITNest.asgi:application autostart=true autorestart=true stdout_logfile=/tmp/websocket.log redirect_stderr=true ``` 修改supervisor配置文件 ```python (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# vim supervisord.conf # 最后加上 [include] ;files = relative/directory/*.ini files = /home/user/ITNest/Supervisor/*.ini ``` 启动supervisord ```bash (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# supervisord -c supervisord.conf (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# tail /tmp/websocket.log ``` 重启 ```bash (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# supervisorctl -c supervisord.conf restart Error: restart requires a process name restart <name> Restart a process restart <gname>:* Restart all processes in a group restart <name> <name> Restart multiple processes or groups restart all Restart all processes Note: restart does not reread config files. For that, see reread and update. (ITNest) root@PxeCtrlSys:/home/user/ITNest/Supervisor# supervisorctl -c Supervisor/supervisord.conf restart all daphne: stopped daphne: started ```
很赞哦! (4)
相关文章
文章交流
- emoji