Hi All:
-- I'm migrating a small application from Django 1.x/Channels 1.x to Django 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to determine. It could be due to a failure on my part to correctly implement the channel_layer or it could be due to an incompatibility with Celery 4.1. The basics are: - Run a periodic Celery task - Use the channel_layer to perform a group_send - Have the consumer receive the group_send event and push a json message over the socket Show below is my simple consumer.py module: class mstatusMessage(AsyncJsonWebsocketConsumer): ##### WebSocket event handlers async def connect(self): """ Called when the websocket is handshaking as part of initial connection. """ logging.info("### Connected ###") # Accept the connection await self.accept() # Add to the group so they get messages await self.channel_layer.group_add( settings.CHANNEL_GROUP, self.channel_name, ) async def disconnect(self, code): """ Called when the WebSocket closes for any reason. """ # Remove them from the group await self.channel_layer.group_discard( settings.CHANNEL_GROUP, self.channel_name, ) ##### Handlers for messages sent over the channel layer # These helper methods are named by the types we send - so epics.join becomes epics_join async def epics_message(self, event): """ Called when the Celery task queries Epics. """ logging.error("### Received Msg ###") # Send a message down to the client await self.send_json( { "text": event["message"], }, ) The routing is simple: application = ProtocolTypeRouter({ "websocket": mstatusMessage }) The Celery task is as follows: @shared_task def updateData(param): logger.error('##### updateData #####') # # Get an instance of the channel layer for # # inter task communications channel_layer = get_channel_layer() channel_layer.group_send( settings.CHANNEL_GROUP, {"type": "epics.message", "text": "Hello World"}, ) The results are promising as the websocket connect opens successfully and the Celery task run as show by the debugging output given below: 127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - - 2018-03-02 09:32:12,280 INFO ### Connected ### 127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - - [2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[8d329e61-]: ##### updateData ##### [2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[786f51a6-]: ##### updateData ##### BUT ............... although the Celery task runs the consumer never receives a message via the channel layer. This could be due to an implementation error or, maybe, a compatibility issue. The application doesn't crash but the following warning is issued: [2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 'RedisChannelLayer.group_send' was never awaited {"type": "epics.message", "text": "Hello World"}, This warning appears related to the Python asyncio functionality. Under the Celery task module, the channel_layer.group_send doesn't use the await directive as it's inclusion hangs the Celery task. Changing the Celery task to: async def updateData(param): logger.error('##### updateData #####') # # Get an instance of the channel layer for # # inter task communications channel_layer = get_channel_layer() await channel_layer.group_send( settings.CHANNEL_GROUP, {"type": "epics.message", "text": "Hello World"}, ) This results in the following runtime warning and the Celery task fails to run (the debug message is never printed) : [2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358: RuntimeWarning: coroutine 'updateData' was never awaited result = (True, prepare_result(fun(*args, **kwargs))) I'm sure these warnings are understood by someone who can provide guidance with respect to a solution. Thanks, G Broten Reference: The application has be tried under two OS versions: CentOS 7.4 Alpine Linux 3.7 A partial pip list of the significant packages: asgi-redis (1.4.3) asgiref (2.1.6) async-timeout (2.0.0) billiard (3.5.0.3) cached-property (1.4.0) celery (4.1.0) channels (2.0.2) channels-redis (2.1.0) daphne (2.0.4) Django (2.0.2) redis (2.10.6) You received this message because you are subscribed to the Google Groups "Django users" group. To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email]. To post to this group, send email to [hidden email]. Visit this group at https://groups.google.com/group/django-users. To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/8bc30e1e-3a68-4eea-983a-937b85c05b59%40googlegroups.com. For more options, visit https://groups.google.com/d/optout. |
Taking a stab at this - I believe the original problem may be here:
-- channel_layer.group_send( settings.CHANNEL_GROUP, {"type": "epics.message", "text": "Hello World"}, ) Your updateData method is a synchronous method. However, channel_layer.group_send is an asynchronous method. What you might try is wrapping the group_send method in the async_to_sync function. See the documentation at http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions async_to_sync(channel_layer.group_send)( settings.CHANNEL_GROUP, {"type": "epics.message", "text": "Hello World"}, ) Your first solution to make updateData an asynchronous method might work with some other work involved in adding that task to the event loop - but that answer is beyond me at the moment. Hope this helps, Ken On Friday, March 2, 2018 at 1:36:08 PM UTC-5, G Broten wrote:
You received this message because you are subscribed to the Google Groups "Django users" group. To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email]. To post to this group, send email to [hidden email]. Visit this group at https://groups.google.com/group/django-users. To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/1199cf0d-bd71-422f-b148-5148eaa59d29%40googlegroups.com. For more options, visit https://groups.google.com/d/optout. |
Two big thumbs up for Ken!
-- His keen eyes spotted the problem, which was attempting an asynchronous send from the Celery task. I changed the code to use the synchronous send and, bingo, the consumer now receives events via the channel layer. A big thank-you to Ken! And, I'm sure anyone else using Channels 2.0 with Celery will find this thread of use. G Broten On Friday, March 2, 2018 at 12:57:01 PM UTC-6, Ken Whitesell wrote:
You received this message because you are subscribed to the Google Groups "Django users" group. To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email]. To post to this group, send email to [hidden email]. Visit this group at https://groups.google.com/group/django-users. To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/09c2b5c2-1926-45be-9139-927db807c703%40googlegroups.com. For more options, visit https://groups.google.com/d/optout. |
In reply to this post by G Broten
Plese, Can you send us a basic example of celery 4 and channels 2?
-- El viernes, 2 de marzo de 2018, 19:36:08 (UTC+1), G Broten escribió:
You received this message because you are subscribed to the Google Groups "Django users" group. To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email]. To post to this group, send email to [hidden email]. Visit this group at https://groups.google.com/group/django-users. To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/a03ce4f9-cebc-4035-903f-a40d6aaf52cb%40googlegroups.com. For more options, visit https://groups.google.com/d/optout. |
Free forum by Nabble | Edit this page |