Django 2.0.2, Channels 2.0.2 and Celery 4.1 Issue

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Django 2.0.2, Channels 2.0.2 and Celery 4.1 Issue

G Broten
Hi All:
 I'm migrating a small application from Django 1.x/Channels 1.x to Django 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to determine. It could be due to a failure on my part to correctly implement the channel_layer or it could be due to an
incompatibility with Celery 4.1. The basics are:
- Run a periodic Celery task
- Use the channel_layer to perform a group_send
- Have the consumer receive the group_send event and push a json  message over the socket

Show below is my simple consumer.py module:
class mstatusMessage(AsyncJsonWebsocketConsumer):

    ##### WebSocket event handlers

    async def connect(self):
        """
        Called when the websocket is handshaking as part of initial connection.
        """
        logging.info("### Connected ###")
        # Accept the connection
        await self.accept()

        # Add to the group so they get messages
        await self.channel_layer.group_add(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    async def disconnect(self, code):
        """
        Called when the WebSocket closes for any reason.
        """
        # Remove them from the group
        await self.channel_layer.group_discard(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    ##### Handlers for messages sent over the channel layer

    # These helper methods are named by the types we send - so epics.join becomes epics_join
    async def epics_message(self, event):
        """
        Called when the Celery task queries Epics.
        """
        logging.error("### Received Msg ###")
        # Send a message down to the client
        await self.send_json(
            {
                "text": event["message"],
            },
        )

The routing is simple:
application = ProtocolTypeRouter({
    "websocket":  mstatusMessage
})

The Celery task is as follows:
@shared_task
def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

The results are promising as the websocket connect opens successfully and the Celery task run as show by the debugging output given below:
127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
2018-03-02 09:32:12,280 INFO     ### Connected ###
127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
[2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
[2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[786f51a6-]: ##### updateData #####

BUT ............... although the Celery task runs the consumer never receives a message via the channel layer. This could be due to an
implementation error or, maybe, a compatibility issue. The application doesn't crash but the following warning is issued:

[2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 'RedisChannelLayer.group_send' was never awaited
  {"type": "epics.message", "text": "Hello World"},

This warning appears related to the Python asyncio functionality. Under the Celery task module, the channel_layer.group_send
doesn't use the await directive as it's inclusion hangs the Celery task. Changing the Celery task to:
async def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    await channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

This results in the following runtime warning and the Celery task fails to run (the debug message is never printed) :

[2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358: RuntimeWarning: coroutine 'updateData' was never awaited
  result = (True, prepare_result(fun(*args, **kwargs)))

I'm sure these warnings are understood by someone who can provide guidance with respect to a solution.

Thanks,

G Broten

Reference:

The application has be tried under two OS versions:
CentOS 7.4
Alpine Linux 3.7

A partial pip list of the significant packages:
asgi-redis (1.4.3)
asgiref (2.1.6)
async-timeout (2.0.0)
billiard (3.5.0.3)
cached-property (1.4.0)
celery (4.1.0)
channels (2.0.2)
channels-redis (2.1.0)
daphne (2.0.4)
Django (2.0.2)
redis (2.10.6)





--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To post to this group, send email to [hidden email].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/8bc30e1e-3a68-4eea-983a-937b85c05b59%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply | Threaded
Open this post in threaded view
|

Re: Django 2.0.2, Channels 2.0.2 and Celery 4.1 Issue

Ken Whitesell
Taking a stab at this - I believe the original problem may be here:
    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

Your updateData method is a synchronous method. However, channel_layer.group_send is an asynchronous method.

What you might try is wrapping the group_send method in the async_to_sync function.
See the documentation at http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions

    async_to_sync(channel_layer.group_send)(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )


Your first solution to make updateData an asynchronous method might work with some other work involved in adding that task to the event loop - but that answer is beyond me at the moment.

Hope this helps,
     Ken


On Friday, March 2, 2018 at 1:36:08 PM UTC-5, G Broten wrote:
Hi All:
 I'm migrating a small application from Django 1.x/Channels 1.x to Django 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to determine. It could be due to a failure on my part to correctly implement the channel_layer or it could be due to an
incompatibility with Celery 4.1. The basics are:
- Run a periodic Celery task
- Use the channel_layer to perform a group_send
- Have the consumer receive the group_send event and push a json  message over the socket

Show below is my simple consumer.py module:
class mstatusMessage(AsyncJsonWebsocketConsumer):

    ##### WebSocket event handlers

    async def connect(self):
        """
        Called when the websocket is handshaking as part of initial connection.
        """
        <a href="http://logging.info" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2Flogging.info\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGZET3wJsfsGtYfyJqZACr-DSOBBg&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2Flogging.info\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGZET3wJsfsGtYfyJqZACr-DSOBBg&#39;;return true;">logging.info("### Connected ###")
        # Accept the connection
        await self.accept()

        # Add to the group so they get messages
        await self.channel_layer.group_add(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    async def disconnect(self, code):
        """
        Called when the WebSocket closes for any reason.
        """
        # Remove them from the group
        await self.channel_layer.group_discard(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    ##### Handlers for messages sent over the channel layer

    # These helper methods are named by the types we send - so epics.join becomes epics_join
    async def epics_message(self, event):
        """
        Called when the Celery task queries Epics.
        """
        logging.error("### Received Msg ###")
        # Send a message down to the client
        await self.send_json(
            {
                "text": event["message"],
            },
        )

The routing is simple:
application = ProtocolTypeRouter({
    "websocket":  mstatusMessage
})

The Celery task is as follows:
@shared_task
def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

The results are promising as the websocket connect opens successfully and the Celery task run as show by the debugging output given below:
<a href="http://127.0.0.1:59818" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59818\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGrepd2oVn1rW9xTyH_oIn4z5A5DA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59818\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGrepd2oVn1rW9xTyH_oIn4z5A5DA&#39;;return true;">127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
<a href="http://127.0.0.1:59844" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;">127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
2018-03-02 09:32:12,280 INFO     ### Connected ###
<a href="http://127.0.0.1:59844" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;">127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
[2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
[2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[786f51a6-]: ##### updateData #####

BUT ............... although the Celery task runs the consumer never receives a message via the channel layer. This could be due to an
implementation error or, maybe, a compatibility issue. The application doesn't crash but the following warning is issued:

[2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 'RedisChannelLayer.group_send' was never awaited
  {"type": "epics.message", "text": "Hello World"},

This warning appears related to the Python asyncio functionality. Under the Celery task module, the channel_layer.group_send
doesn't use the await directive as it's inclusion hangs the Celery task. Changing the Celery task to:
async def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    await channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

This results in the following runtime warning and the Celery task fails to run (the debug message is never printed) :

[2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358: RuntimeWarning: coroutine 'updateData' was never awaited
  result = (True, prepare_result(fun(*args, **kwargs)))

I'm sure these warnings are understood by someone who can provide guidance with respect to a solution.

Thanks,

G Broten

Reference:

The application has be tried under two OS versions:
CentOS 7.4
Alpine Linux 3.7

A partial pip list of the significant packages:
asgi-redis (1.4.3)
asgiref (2.1.6)
async-timeout (2.0.0)
billiard (3.5.0.3)
cached-property (1.4.0)
celery (4.1.0)
channels (2.0.2)
channels-redis (2.1.0)
daphne (2.0.4)
Django (2.0.2)
redis (2.10.6)





--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To post to this group, send email to [hidden email].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/1199cf0d-bd71-422f-b148-5148eaa59d29%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply | Threaded
Open this post in threaded view
|

Re: Django 2.0.2, Channels 2.0.2 and Celery 4.1 Issue

G Broten
Two big thumbs up for Ken! 
His keen eyes spotted the problem, which was attempting an asynchronous send from the Celery task. I changed the code to use the synchronous send and, bingo, the consumer now receives events via the channel layer.

A big thank-you to Ken! And, I'm sure anyone else using Channels 2.0 with Celery will find this thread of use.

G Broten

On Friday, March 2, 2018 at 12:57:01 PM UTC-6, Ken Whitesell wrote:
Taking a stab at this - I believe the original problem may be here:
    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

Your updateData method is a synchronous method. However, channel_layer.group_send is an asynchronous method.

What you might try is wrapping the group_send method in the async_to_sync function.
See the documentation at <a href="http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2Fchannels.readthedocs.io%2Fen%2Flatest%2Ftopics%2Fchannel_layers.html%23synchronous-functions\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGjru9EDtXIzdBUgkmvv_pG06CAPg&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2Fchannels.readthedocs.io%2Fen%2Flatest%2Ftopics%2Fchannel_layers.html%23synchronous-functions\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGjru9EDtXIzdBUgkmvv_pG06CAPg&#39;;return true;">http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions

    async_to_sync(channel_layer.group_send)(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )


Your first solution to make updateData an asynchronous method might work with some other work involved in adding that task to the event loop - but that answer is beyond me at the moment.

Hope this helps,
     Ken


On Friday, March 2, 2018 at 1:36:08 PM UTC-5, G Broten wrote:
Hi All:
 I'm migrating a small application from Django 1.x/Channels 1.x to Django 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to determine. It could be due to a failure on my part to correctly implement the channel_layer or it could be due to an
incompatibility with Celery 4.1. The basics are:
- Run a periodic Celery task
- Use the channel_layer to perform a group_send
- Have the consumer receive the group_send event and push a json  message over the socket

Show below is my simple consumer.py module:
class mstatusMessage(AsyncJsonWebsocketConsumer):

    ##### WebSocket event handlers

    async def connect(self):
        """
        Called when the websocket is handshaking as part of initial connection.
        """
        <a href="http://logging.info" rel="nofollow" target="_blank" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2Flogging.info\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGZET3wJsfsGtYfyJqZACr-DSOBBg&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2Flogging.info\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGZET3wJsfsGtYfyJqZACr-DSOBBg&#39;;return true;">logging.info("### Connected ###")
        # Accept the connection
        await self.accept()

        # Add to the group so they get messages
        await self.channel_layer.group_add(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    async def disconnect(self, code):
        """
        Called when the WebSocket closes for any reason.
        """
        # Remove them from the group
        await self.channel_layer.group_discard(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    ##### Handlers for messages sent over the channel layer

    # These helper methods are named by the types we send - so epics.join becomes epics_join
    async def epics_message(self, event):
        """
        Called when the Celery task queries Epics.
        """
        logging.error("### Received Msg ###")
        # Send a message down to the client
        await self.send_json(
            {
                "text": event["message"],
            },
        )

The routing is simple:
application = ProtocolTypeRouter({
    "websocket":  mstatusMessage
})

The Celery task is as follows:
@shared_task
def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

The results are promising as the websocket connect opens successfully and the Celery task run as show by the debugging output given below:
<a href="http://127.0.0.1:59818" rel="nofollow" target="_blank" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59818\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGrepd2oVn1rW9xTyH_oIn4z5A5DA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59818\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGrepd2oVn1rW9xTyH_oIn4z5A5DA&#39;;return true;">127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
<a href="http://127.0.0.1:59844" rel="nofollow" target="_blank" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;">127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
2018-03-02 09:32:12,280 INFO     ### Connected ###
<a href="http://127.0.0.1:59844" rel="nofollow" target="_blank" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;">127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
[2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
[2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[786f51a6-]: ##### updateData #####

BUT ............... although the Celery task runs the consumer never receives a message via the channel layer. This could be due to an
implementation error or, maybe, a compatibility issue. The application doesn't crash but the following warning is issued:

[2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 'RedisChannelLayer.group_send' was never awaited
  {"type": "epics.message", "text": "Hello World"},

This warning appears related to the Python asyncio functionality. Under the Celery task module, the channel_layer.group_send
doesn't use the await directive as it's inclusion hangs the Celery task. Changing the Celery task to:
async def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    await channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

This results in the following runtime warning and the Celery task fails to run (the debug message is never printed) :

[2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358: RuntimeWarning: coroutine 'updateData' was never awaited
  result = (True, prepare_result(fun(*args, **kwargs)))

I'm sure these warnings are understood by someone who can provide guidance with respect to a solution.

Thanks,

G Broten

Reference:

The application has be tried under two OS versions:
CentOS 7.4
Alpine Linux 3.7

A partial pip list of the significant packages:
asgi-redis (1.4.3)
asgiref (2.1.6)
async-timeout (2.0.0)
billiard (3.5.0.3)
cached-property (1.4.0)
celery (4.1.0)
channels (2.0.2)
channels-redis (2.1.0)
daphne (2.0.4)
Django (2.0.2)
redis (2.10.6)





--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To post to this group, send email to [hidden email].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/09c2b5c2-1926-45be-9139-927db807c703%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply | Threaded
Open this post in threaded view
|

Re: Django 2.0.2, Channels 2.0.2 and Celery 4.1 Issue

Sergio Lopez
In reply to this post by G Broten
Plese, Can you send us a basic example of celery 4 and channels 2?

El viernes, 2 de marzo de 2018, 19:36:08 (UTC+1), G Broten escribió:
Hi All:
 I'm migrating a small application from Django 1.x/Channels 1.x to Django 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to determine. It could be due to a failure on my part to correctly implement the channel_layer or it could be due to an
incompatibility with Celery 4.1. The basics are:
- Run a periodic Celery task
- Use the channel_layer to perform a group_send
- Have the consumer receive the group_send event and push a json  message over the socket

Show below is my simple consumer.py module:
class mstatusMessage(AsyncJsonWebsocketConsumer):

    ##### WebSocket event handlers

    async def connect(self):
        """
        Called when the websocket is handshaking as part of initial connection.
        """
        <a href="http://logging.info" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2Flogging.info\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGZET3wJsfsGtYfyJqZACr-DSOBBg&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2Flogging.info\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGZET3wJsfsGtYfyJqZACr-DSOBBg&#39;;return true;">logging.info("### Connected ###")
        # Accept the connection
        await self.accept()

        # Add to the group so they get messages
        await self.channel_layer.group_add(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    async def disconnect(self, code):
        """
        Called when the WebSocket closes for any reason.
        """
        # Remove them from the group
        await self.channel_layer.group_discard(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    ##### Handlers for messages sent over the channel layer

    # These helper methods are named by the types we send - so epics.join becomes epics_join
    async def epics_message(self, event):
        """
        Called when the Celery task queries Epics.
        """
        logging.error("### Received Msg ###")
        # Send a message down to the client
        await self.send_json(
            {
                "text": event["message"],
            },
        )

The routing is simple:
application = ProtocolTypeRouter({
    "websocket":  mstatusMessage
})

The Celery task is as follows:
@shared_task
def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

The results are promising as the websocket connect opens successfully and the Celery task run as show by the debugging output given below:
<a href="http://127.0.0.1:59818" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59818\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGrepd2oVn1rW9xTyH_oIn4z5A5DA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59818\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNGrepd2oVn1rW9xTyH_oIn4z5A5DA&#39;;return true;">127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
<a href="http://127.0.0.1:59844" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;">127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
2018-03-02 09:32:12,280 INFO     ### Connected ###
<a href="http://127.0.0.1:59844" target="_blank" rel="nofollow" onmousedown="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;" onclick="this.href=&#39;http://www.google.com/url?q\x3dhttp%3A%2F%2F127.0.0.1%3A59844\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNHsA441vdE-vN1qw9_WHEQL5x7EEA&#39;;return true;">127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
[2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
[2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[786f51a6-]: ##### updateData #####

BUT ............... although the Celery task runs the consumer never receives a message via the channel layer. This could be due to an
implementation error or, maybe, a compatibility issue. The application doesn't crash but the following warning is issued:

[2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 'RedisChannelLayer.group_send' was never awaited
  {"type": "epics.message", "text": "Hello World"},

This warning appears related to the Python asyncio functionality. Under the Celery task module, the channel_layer.group_send
doesn't use the await directive as it's inclusion hangs the Celery task. Changing the Celery task to:
async def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    await channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

This results in the following runtime warning and the Celery task fails to run (the debug message is never printed) :

[2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358: RuntimeWarning: coroutine 'updateData' was never awaited
  result = (True, prepare_result(fun(*args, **kwargs)))

I'm sure these warnings are understood by someone who can provide guidance with respect to a solution.

Thanks,

G Broten

Reference:

The application has be tried under two OS versions:
CentOS 7.4
Alpine Linux 3.7

A partial pip list of the significant packages:
asgi-redis (1.4.3)
asgiref (2.1.6)
async-timeout (2.0.0)
billiard (3.5.0.3)
cached-property (1.4.0)
celery (4.1.0)
channels (2.0.2)
channels-redis (2.1.0)
daphne (2.0.4)
Django (2.0.2)
redis (2.10.6)





--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To post to this group, send email to [hidden email].
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/a03ce4f9-cebc-4035-903f-a40d6aaf52cb%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.