Channels: launching a custom co-routine from SyncConsumer

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Channels: launching a custom co-routine from SyncConsumer

Dan Merillat

I have an application that's 99% using the ORM and database work so it is using SyncConsumer.
However, I have one minor part that needs to use a timer, and I cannot get it to work for the life of me.

In an AsyncConsumer, I can use asyncio.ensure_future(self.coroutine()) to start a timer.

In sync consumer, nothing I've tried works:

class TestConsumer(SyncConsumer):

    async def timer_task(self):
        print(f'timer task: {self.counter}')
        asyncio.sleep(1)
        self.counter += 1
        print(f'timer expired: {self.counter}')

    def connect(self):
        self.counter = 0

        self.timer = asyncio.ensure_future(self.timer_task())
        # ERROR:daphne.server:Exception inside application: There is no current event loop in thread 'ThreadPoolExecutor-0_20'.

        self.timer = async_to_sync(asyncio.ensure_future)(self.timer_task())
        # ERROR:daphne.server:Exception inside application: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

I thought SyncConsumer was async under the hood, but I have no idea how to leverage that.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/6fe7ad95-1bdb-4de2-9ed2-8c566ba27939%40googlegroups.com.
Reply | Threaded
Open this post in threaded view
|

Re: Channels: launching a custom co-routine from SyncConsumer

Andrew Godwin-3
SyncConsumer isn't async - it runs inside a separate synchronous thread. You'll need to get the event loop from the other thread and use call_soon_threadsafe instead!

Andrew

On Wed, Aug 14, 2019 at 6:59 PM Dan Merillat <[hidden email]> wrote:

I have an application that's 99% using the ORM and database work so it is using SyncConsumer.
However, I have one minor part that needs to use a timer, and I cannot get it to work for the life of me.

In an AsyncConsumer, I can use asyncio.ensure_future(self.coroutine()) to start a timer.

In sync consumer, nothing I've tried works:

class TestConsumer(SyncConsumer):

    async def timer_task(self):
        print(f'timer task: {self.counter}')
        asyncio.sleep(1)
        self.counter += 1
        print(f'timer expired: {self.counter}')

    def connect(self):
        self.counter = 0

        self.timer = asyncio.ensure_future(self.timer_task())
        # ERROR:daphne.server:Exception inside application: There is no current event loop in thread 'ThreadPoolExecutor-0_20'.

        self.timer = async_to_sync(asyncio.ensure_future)(self.timer_task())
        # ERROR:daphne.server:Exception inside application: You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.

I thought SyncConsumer was async under the hood, but I have no idea how to leverage that.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/6fe7ad95-1bdb-4de2-9ed2-8c566ba27939%40googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/CAFwN1uoGn%2B7C0hLFat%3DjMVgOKL2nYHAuae1i6mrC-JRW6W0uww%40mail.gmail.com.
Reply | Threaded
Open this post in threaded view
|

Re: Channels: launching a custom co-routine from SyncConsumer

Dan Merillat
On 8/15/19 11:10 PM, Andrew Godwin wrote:
> SyncConsumer isn't async - it runs inside a separate synchronous thread.
> You'll need to get the event loop from the other thread and use
> call_soon_threadsafe instead!

Sorry for the delay in getting back to you, it took me a while to go
through the different layers to find the appropriate event loop.  Thank
you for the pointer, it got me headed down what I hope is the right path.

SyncConsumer is based on AsyncConsumer, with dispatch() ultimately
wrapped in SyncToAsync via DatabaseSyncToAsync.  SyncConsumer should
have a guaranteed SyncToAsync instance with threadlocal on the executor
thread, because it is running inside a SyncToAsync to begin with.  Is
that correct?

The following "works" as part of my sync consumer but I'm not positive
what the sharp edges are going to be:

     def start_coroutine(self, coroutine):
         loop = getattr(SyncToAsync.threadlocal, "main_event_loop", None)
         if not (loop and loop.is_running()):
             loop = asyncio.new_event_loop()
             asyncio.set_event_loop(loop)
         loop.call_soon_threadsafe(loop.create_task, coroutine())

Where the coroutine is anything awaitable.

The documentation at
https://channels.readthedocs.io/en/latest/topics/consumers.html goes
into the opposite scenario, using Synchronous functions such as django
ORM from an AsyncConsumer, but would really benefit from an example of
something like an idle timeout implementation for a SyncConsumer.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/8bb3ac80-3b70-e8f6-697a-af5d86086612%40gmail.com.
Reply | Threaded
Open this post in threaded view
|

Re: Channels: launching a custom co-routine from SyncConsumer

Ezequias Rocha
Sorry my dalay also. My problem is that I was looking for a wrong PostgreSQL sequence. My fault, my totally fault.

Django rocks!

Sincerely
Ezequias Rocha

On Wednesday, August 28, 2019 at 11:26:25 AM UTC-3, Dan Merillat wrote:
On 8/15/19 11:10 PM, Andrew Godwin wrote:
> SyncConsumer isn't async - it runs inside a separate synchronous thread.
> You'll need to get the event loop from the other thread and use
> call_soon_threadsafe instead!

Sorry for the delay in getting back to you, it took me a while to go
through the different layers to find the appropriate event loop.  Thank
you for the pointer, it got me headed down what I hope is the right path.

SyncConsumer is based on AsyncConsumer, with dispatch() ultimately
wrapped in SyncToAsync via DatabaseSyncToAsync.  SyncConsumer should
have a guaranteed SyncToAsync instance with threadlocal on the executor
thread, because it is running inside a SyncToAsync to begin with.  Is
that correct?

The following "works" as part of my sync consumer but I'm not positive
what the sharp edges are going to be:

     def start_coroutine(self, coroutine):
         loop = getattr(SyncToAsync.threadlocal, "main_event_loop", None)
         if not (loop and loop.is_running()):
             loop = asyncio.new_event_loop()
             asyncio.set_event_loop(loop)
         loop.call_soon_threadsafe(loop.create_task, coroutine())

Where the coroutine is anything awaitable.

The documentation at
<a href="https://channels.readthedocs.io/en/latest/topics/consumers.html" target="_blank" rel="nofollow" onmousedown="this.href=&#39;https://www.google.com/url?q\x3dhttps%3A%2F%2Fchannels.readthedocs.io%2Fen%2Flatest%2Ftopics%2Fconsumers.html\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNFkOGo9rVu2yauOy3GZ3VEYAcVUog&#39;;return true;" onclick="this.href=&#39;https://www.google.com/url?q\x3dhttps%3A%2F%2Fchannels.readthedocs.io%2Fen%2Flatest%2Ftopics%2Fconsumers.html\x26sa\x3dD\x26sntz\x3d1\x26usg\x3dAFQjCNFkOGo9rVu2yauOy3GZ3VEYAcVUog&#39;;return true;">https://channels.readthedocs.io/en/latest/topics/consumers.html goes
into the opposite scenario, using Synchronous functions such as django
ORM from an AsyncConsumer, but would really benefit from an example of
something like an idle timeout implementation for a SyncConsumer.

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to [hidden email].
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/fb7bdf64-66e3-47d6-a3aa-6b562b0a96b4%40googlegroups.com.