Jython threading with thread -> queue -> thread

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Jython threading with thread -> queue -> thread

Chuck Bearden
(Cross-posted at StackOverflow:
http://stackoverflow.com/questions/23683382/jython-threading-with-thread-queue-thread)

I'm running Jython 2.5.3 on Ubuntu 12.04 with the OpenJDK 64-bit 1.7.0_55 JVM.

I'm trying to create a simple threaded application to optimize data
processing and loading. I have populator threads that read records
from a database and mangles them a bit before putting them onto a
queue. The queue is read by consumer threads that store the data in a
different database. Here is the outline of my code:


    import sys
    import time
    import threading
    import Queue

    class PopulatorThread(threading.Thread):
        def __init__(self, mod, mods, queue):
            super(PopulatorThread, self).__init__()
            self.mod = mod
            self.mods = mods
            self.queue = queue

        def run(self):
            # Create db connection
            # ...
            try:
                # Select one segment of records using 'id % mods = mod'
                # Process these records & slap them onto the queue
                # ...
            except:
                con.rollback()
                raise
            finally:
                print "Made it to 'finally' in populator %d" % self.mod
                con.close()


    class ConsumerThread(threading.Thread):
        def __init__(self, mod, queue):
            super(ConsumerThread, self).__init__()
            self.mod = mod
            self.queue = queue

        def run(self):
            # Create db connection
            # ...
            try:
                # Pull records off the queue and put them into
                # a different database
                # ...
            except:
                con.rollback()
                raise
            finally:
                print "Made it to 'finally' in consumer %d" % self.mod
                con.close()


    def main(argv):
        tread1Count = 3
        tread2Count = 4
        # This is the notefactsselector data queue
        nfsQueue = Queue.Queue()

        # Start consumer/writer threads
        j = 0
        treads2 = []
        while j < tread2Count:
            treads2.append(ConsumerThread(j, nfsQueue))
            treads2[-1].start()
            j += 1

        # Start reader/populator threads
        i = 0
        treads1 = []
        while i < tread1Count:
            treads1.append(PopulatorThread(i, tread1Count, nfsQueue))
            treads1[-1].start()
            i += 1

        # Wait for reader/populator threads
        print "Waiting to join %d populator threads" % len(treads1)
        i = 0
        for tread in treads1:
            "Waiting to join a populator thread %d" % i
            tread.join()
            i += 1

        #Add one sentinel value to queue for each write thread
        print "Adding sentinel values to end of queue"
        for tread in treads2:
            nfsQueue.put(None)

        # Wait for consumer/writer threads
        print "Waiting to join consumer/writer threads"
        for tread in treads2:
            print "Waiting on a consumer/writer"
            tread.join()

        # Wait for Queue
        print "Waiting to join queue with %d items" % nfsQueue.qsize()
        nfsQueue.join()
        print "Queue has been joined"


    if __name__ == '__main__':
        main(sys.argv)


I have simplified the database implementation somewhat to save space.

When I run the code, the populator and consumer threads seem to reach
the end, since I get the "Made it to finally in ..." messages.
I get the "Waiting to join n populator threads" message, but no
"Waiting to join a populator thread n" messages.
I get the "Waiting to join consumer/writer threads" message as well as
each of the "Waiting on a consumer/writer" messages I expect.
I get the "Waiting to join queue with 0 items" message I expect, but
not the "Queue has been joined" message; apparently the program is
blocking while waiting for the queue, and it never terminates.

I suspect I have my thread initializations or thread joinings in the
wrong order somehow, but I have little experience with concurrent
programming, so my intuitions about how to do things aren't well
developed. I find plenty of Python/Jython examples of queues populated
by while loops and read by threads, but none so far about queues
populated by one set of threads and read by a different set.

The populator and consumer threads appear to finish, but I'm puzzled
by the fact that I don't see the "Waiting to join a populator thread
n" messages, even though the array containing the threads is populated
and the program continues beyond that point.

The program seems to be blocking finally waiting for the Queue object
to terminate.

Thanks to any who have suggestions and lessons for me!
Chuck

------------------------------------------------------------------------------
"Accelerate Dev Cycles with Automated Cross-Browser Testing - For FREE
Instantly run your Selenium tests across 300+ browser/OS combos.
Get unparalleled scalability from the best Selenium testing platform available
Simple to use. Nothing to install. Get started now for free."
http://p.sf.net/sfu/SauceLabs
_______________________________________________
Jython-users mailing list
[hidden email]
https://lists.sourceforge.net/lists/listinfo/jython-users
Reply | Threaded
Open this post in threaded view
|

Re: Jython threading with thread -> queue -> thread

Chuck Bearden
A friend pointed out that I was missing the 'print' command before the
"Waiting to join a populator thread %d" % i, so when I add that, I do
get the "Waiting to join a populator thread n" messages after all. So
the only mystery for me is why the queue is never joined.

I get the same behavior with Jython 2.7b2 also, BTW.

Thanks,
Chuck


On Thu, May 15, 2014 at 2:04 PM, Chuck Bearden <[hidden email]> wrote:

> (Cross-posted at StackOverflow:
> http://stackoverflow.com/questions/23683382/jython-threading-with-thread-queue-thread)
>
> I'm running Jython 2.5.3 on Ubuntu 12.04 with the OpenJDK 64-bit 1.7.0_55 JVM.
>
> I'm trying to create a simple threaded application to optimize data
> processing and loading. I have populator threads that read records
> from a database and mangles them a bit before putting them onto a
> queue. The queue is read by consumer threads that store the data in a
> different database. Here is the outline of my code:
>
>
>     import sys
>     import time
>     import threading
>     import Queue
>
>     class PopulatorThread(threading.Thread):
>         def __init__(self, mod, mods, queue):
>             super(PopulatorThread, self).__init__()
>             self.mod = mod
>             self.mods = mods
>             self.queue = queue
>
>         def run(self):
>             # Create db connection
>             # ...
>             try:
>                 # Select one segment of records using 'id % mods = mod'
>                 # Process these records & slap them onto the queue
>                 # ...
>             except:
>                 con.rollback()
>                 raise
>             finally:
>                 print "Made it to 'finally' in populator %d" % self.mod
>                 con.close()
>
>
>     class ConsumerThread(threading.Thread):
>         def __init__(self, mod, queue):
>             super(ConsumerThread, self).__init__()
>             self.mod = mod
>             self.queue = queue
>
>         def run(self):
>             # Create db connection
>             # ...
>             try:
>                 # Pull records off the queue and put them into
>                 # a different database
>                 # ...
>             except:
>                 con.rollback()
>                 raise
>             finally:
>                 print "Made it to 'finally' in consumer %d" % self.mod
>                 con.close()
>
>
>     def main(argv):
>         tread1Count = 3
>         tread2Count = 4
>         # This is the notefactsselector data queue
>         nfsQueue = Queue.Queue()
>
>         # Start consumer/writer threads
>         j = 0
>         treads2 = []
>         while j < tread2Count:
>             treads2.append(ConsumerThread(j, nfsQueue))
>             treads2[-1].start()
>             j += 1
>
>         # Start reader/populator threads
>         i = 0
>         treads1 = []
>         while i < tread1Count:
>             treads1.append(PopulatorThread(i, tread1Count, nfsQueue))
>             treads1[-1].start()
>             i += 1
>
>         # Wait for reader/populator threads
>         print "Waiting to join %d populator threads" % len(treads1)
>         i = 0
>         for tread in treads1:
>             "Waiting to join a populator thread %d" % i
>             tread.join()
>             i += 1
>
>         #Add one sentinel value to queue for each write thread
>         print "Adding sentinel values to end of queue"
>         for tread in treads2:
>             nfsQueue.put(None)
>
>         # Wait for consumer/writer threads
>         print "Waiting to join consumer/writer threads"
>         for tread in treads2:
>             print "Waiting on a consumer/writer"
>             tread.join()
>
>         # Wait for Queue
>         print "Waiting to join queue with %d items" % nfsQueue.qsize()
>         nfsQueue.join()
>         print "Queue has been joined"
>
>
>     if __name__ == '__main__':
>         main(sys.argv)
>
>
> I have simplified the database implementation somewhat to save space.
>
> When I run the code, the populator and consumer threads seem to reach
> the end, since I get the "Made it to finally in ..." messages.
> I get the "Waiting to join n populator threads" message, but no
> "Waiting to join a populator thread n" messages.
> I get the "Waiting to join consumer/writer threads" message as well as
> each of the "Waiting on a consumer/writer" messages I expect.
> I get the "Waiting to join queue with 0 items" message I expect, but
> not the "Queue has been joined" message; apparently the program is
> blocking while waiting for the queue, and it never terminates.
>
> I suspect I have my thread initializations or thread joinings in the
> wrong order somehow, but I have little experience with concurrent
> programming, so my intuitions about how to do things aren't well
> developed. I find plenty of Python/Jython examples of queues populated
> by while loops and read by threads, but none so far about queues
> populated by one set of threads and read by a different set.
>
> The populator and consumer threads appear to finish, but I'm puzzled
> by the fact that I don't see the "Waiting to join a populator thread
> n" messages, even though the array containing the threads is populated
> and the program continues beyond that point.
>
> The program seems to be blocking finally waiting for the Queue object
> to terminate.
>
> Thanks to any who have suggestions and lessons for me!
> Chuck

------------------------------------------------------------------------------
"Accelerate Dev Cycles with Automated Cross-Browser Testing - For FREE
Instantly run your Selenium tests across 300+ browser/OS combos.
Get unparalleled scalability from the best Selenium testing platform available
Simple to use. Nothing to install. Get started now for free."
http://p.sf.net/sfu/SauceLabs
_______________________________________________
Jython-users mailing list
[hidden email]
https://lists.sourceforge.net/lists/listinfo/jython-users
Reply | Threaded
Open this post in threaded view
|

Re: Jython threading with thread -> queue -> thread

Jim Baker
With some minor changes (see https://gist.github.com/jimbaker/169945e9ae42ce85409e), I made your code runnable. The results are the same for CPython 2.7 and Jython 2.7.

I didn't have a chance to go into details, but clearly it's a problem in your logic. For what it's worth, we actually use the same Queue implementation in Jython, which is written in regular Python, as CPython does. Here's what we wrote about it in the Jython book: http://www.jython.org/jythonbook/en/1.0/Concurrency.html#other-synchronization-objects

- Jim


On Thu, May 15, 2014 at 5:34 PM, Chuck Bearden <[hidden email]> wrote:
A friend pointed out that I was missing the 'print' command before the
"Waiting to join a populator thread %d" % i, so when I add that, I do
get the "Waiting to join a populator thread n" messages after all. So
the only mystery for me is why the queue is never joined.

I get the same behavior with Jython 2.7b2 also, BTW.

Thanks,
Chuck


On Thu, May 15, 2014 at 2:04 PM, Chuck Bearden <[hidden email]> wrote:
> (Cross-posted at StackOverflow:
> http://stackoverflow.com/questions/23683382/jython-threading-with-thread-queue-thread)
>
> I'm running Jython 2.5.3 on Ubuntu 12.04 with the OpenJDK 64-bit 1.7.0_55 JVM.
>
> I'm trying to create a simple threaded application to optimize data
> processing and loading. I have populator threads that read records
> from a database and mangles them a bit before putting them onto a
> queue. The queue is read by consumer threads that store the data in a
> different database. Here is the outline of my code:
>
>
>     import sys
>     import time
>     import threading
>     import Queue
>
>     class PopulatorThread(threading.Thread):
>         def __init__(self, mod, mods, queue):
>             super(PopulatorThread, self).__init__()
>             self.mod = mod
>             self.mods = mods
>             self.queue = queue
>
>         def run(self):
>             # Create db connection
>             # ...
>             try:
>                 # Select one segment of records using 'id % mods = mod'
>                 # Process these records & slap them onto the queue
>                 # ...
>             except:
>                 con.rollback()
>                 raise
>             finally:
>                 print "Made it to 'finally' in populator %d" % self.mod
>                 con.close()
>
>
>     class ConsumerThread(threading.Thread):
>         def __init__(self, mod, queue):
>             super(ConsumerThread, self).__init__()
>             self.mod = mod
>             self.queue = queue
>
>         def run(self):
>             # Create db connection
>             # ...
>             try:
>                 # Pull records off the queue and put them into
>                 # a different database
>                 # ...
>             except:
>                 con.rollback()
>                 raise
>             finally:
>                 print "Made it to 'finally' in consumer %d" % self.mod
>                 con.close()
>
>
>     def main(argv):
>         tread1Count = 3
>         tread2Count = 4
>         # This is the notefactsselector data queue
>         nfsQueue = Queue.Queue()
>
>         # Start consumer/writer threads
>         j = 0
>         treads2 = []
>         while j < tread2Count:
>             treads2.append(ConsumerThread(j, nfsQueue))
>             treads2[-1].start()
>             j += 1
>
>         # Start reader/populator threads
>         i = 0
>         treads1 = []
>         while i < tread1Count:
>             treads1.append(PopulatorThread(i, tread1Count, nfsQueue))
>             treads1[-1].start()
>             i += 1
>
>         # Wait for reader/populator threads
>         print "Waiting to join %d populator threads" % len(treads1)
>         i = 0
>         for tread in treads1:
>             "Waiting to join a populator thread %d" % i
>             tread.join()
>             i += 1
>
>         #Add one sentinel value to queue for each write thread
>         print "Adding sentinel values to end of queue"
>         for tread in treads2:
>             nfsQueue.put(None)
>
>         # Wait for consumer/writer threads
>         print "Waiting to join consumer/writer threads"
>         for tread in treads2:
>             print "Waiting on a consumer/writer"
>             tread.join()
>
>         # Wait for Queue
>         print "Waiting to join queue with %d items" % nfsQueue.qsize()
>         nfsQueue.join()
>         print "Queue has been joined"
>
>
>     if __name__ == '__main__':
>         main(sys.argv)
>
>
> I have simplified the database implementation somewhat to save space.
>
> When I run the code, the populator and consumer threads seem to reach
> the end, since I get the "Made it to finally in ..." messages.
> I get the "Waiting to join n populator threads" message, but no
> "Waiting to join a populator thread n" messages.
> I get the "Waiting to join consumer/writer threads" message as well as
> each of the "Waiting on a consumer/writer" messages I expect.
> I get the "Waiting to join queue with 0 items" message I expect, but
> not the "Queue has been joined" message; apparently the program is
> blocking while waiting for the queue, and it never terminates.
>
> I suspect I have my thread initializations or thread joinings in the
> wrong order somehow, but I have little experience with concurrent
> programming, so my intuitions about how to do things aren't well
> developed. I find plenty of Python/Jython examples of queues populated
> by while loops and read by threads, but none so far about queues
> populated by one set of threads and read by a different set.
>
> The populator and consumer threads appear to finish, but I'm puzzled
> by the fact that I don't see the "Waiting to join a populator thread
> n" messages, even though the array containing the threads is populated
> and the program continues beyond that point.
>
> The program seems to be blocking finally waiting for the Queue object
> to terminate.
>
> Thanks to any who have suggestions and lessons for me!
> Chuck

------------------------------------------------------------------------------
"Accelerate Dev Cycles with Automated Cross-Browser Testing - For FREE
Instantly run your Selenium tests across 300+ browser/OS combos.
Get unparalleled scalability from the best Selenium testing platform available
Simple to use. Nothing to install. Get started now for free."
http://p.sf.net/sfu/SauceLabs
_______________________________________________
Jython-users mailing list
[hidden email]
https://lists.sourceforge.net/lists/listinfo/jython-users



------------------------------------------------------------------------------
"Accelerate Dev Cycles with Automated Cross-Browser Testing - For FREE
Instantly run your Selenium tests across 300+ browser/OS combos.
Get unparalleled scalability from the best Selenium testing platform available
Simple to use. Nothing to install. Get started now for free."
http://p.sf.net/sfu/SauceLabs
_______________________________________________
Jython-users mailing list
[hidden email]
https://lists.sourceforge.net/lists/listinfo/jython-users
Reply | Threaded
Open this post in threaded view
|

Re: Jython threading with thread -> queue -> thread

Chuck Bearden
Thanks for the response. The problem turned out to be in the looping
logic in the consumer thread, which I didn't specify in my sample code
-- my oversight! It looked like this:

    while True:
        elem = queue.get()
        if not elem:
            break
        # Do stuff ...
        queue.task_done()

A friend pointed out that when queue.get() returned the None sentinel
value, I wasn't marking that task as done, which left one task for
each consumer thread ostensibly open, and that was causing the program
to block. An obvious semantic error on my part. I wasn't thinking of
the the None sentinel value as a task, but plainly in the context of
the queue it is, and it needs to be marked as 'done'.

    while True:
        elem = queue.get()
        if not elem:
            queue.task_done()
            break
        # Do stuff ...
        queue.task_done()

Thanks,
Chuck

On Thu, May 15, 2014 at 5:22 PM, Jim Baker <[hidden email]> wrote:

> With some minor changes (see
> https://gist.github.com/jimbaker/169945e9ae42ce85409e), I made your code
> runnable. The results are the same for CPython 2.7 and Jython 2.7.
>
> I didn't have a chance to go into details, but clearly it's a problem in
> your logic. For what it's worth, we actually use the same Queue
> implementation in Jython, which is written in regular Python, as CPython
> does. Here's what we wrote about it in the Jython book:
> http://www.jython.org/jythonbook/en/1.0/Concurrency.html#other-synchronization-objects
>
> - Jim
>
>
> On Thu, May 15, 2014 at 5:34 PM, Chuck Bearden <[hidden email]> wrote:
>>
>> A friend pointed out that I was missing the 'print' command before the
>> "Waiting to join a populator thread %d" % i, so when I add that, I do
>> get the "Waiting to join a populator thread n" messages after all. So
>> the only mystery for me is why the queue is never joined.
>>
>> I get the same behavior with Jython 2.7b2 also, BTW.
>>
>> Thanks,
>> Chuck
>>
>>
>> On Thu, May 15, 2014 at 2:04 PM, Chuck Bearden <[hidden email]>
>> wrote:
>> > (Cross-posted at StackOverflow:
>> >
>> > http://stackoverflow.com/questions/23683382/jython-threading-with-thread-queue-thread)
>> >
>> > I'm running Jython 2.5.3 on Ubuntu 12.04 with the OpenJDK 64-bit
>> > 1.7.0_55 JVM.
>> >
>> > I'm trying to create a simple threaded application to optimize data
>> > processing and loading. I have populator threads that read records
>> > from a database and mangles them a bit before putting them onto a
>> > queue. The queue is read by consumer threads that store the data in a
>> > different database. Here is the outline of my code:
>> >
>> >
>> >     import sys
>> >     import time
>> >     import threading
>> >     import Queue
>> >
>> >     class PopulatorThread(threading.Thread):
>> >         def __init__(self, mod, mods, queue):
>> >             super(PopulatorThread, self).__init__()
>> >             self.mod = mod
>> >             self.mods = mods
>> >             self.queue = queue
>> >
>> >         def run(self):
>> >             # Create db connection
>> >             # ...
>> >             try:
>> >                 # Select one segment of records using 'id % mods = mod'
>> >                 # Process these records & slap them onto the queue
>> >                 # ...
>> >             except:
>> >                 con.rollback()
>> >                 raise
>> >             finally:
>> >                 print "Made it to 'finally' in populator %d" % self.mod
>> >                 con.close()
>> >
>> >
>> >     class ConsumerThread(threading.Thread):
>> >         def __init__(self, mod, queue):
>> >             super(ConsumerThread, self).__init__()
>> >             self.mod = mod
>> >             self.queue = queue
>> >
>> >         def run(self):
>> >             # Create db connection
>> >             # ...
>> >             try:
>> >                 # Pull records off the queue and put them into
>> >                 # a different database
>> >                 # ...
>> >             except:
>> >                 con.rollback()
>> >                 raise
>> >             finally:
>> >                 print "Made it to 'finally' in consumer %d" % self.mod
>> >                 con.close()
>> >
>> >
>> >     def main(argv):
>> >         tread1Count = 3
>> >         tread2Count = 4
>> >         # This is the notefactsselector data queue
>> >         nfsQueue = Queue.Queue()
>> >
>> >         # Start consumer/writer threads
>> >         j = 0
>> >         treads2 = []
>> >         while j < tread2Count:
>> >             treads2.append(ConsumerThread(j, nfsQueue))
>> >             treads2[-1].start()
>> >             j += 1
>> >
>> >         # Start reader/populator threads
>> >         i = 0
>> >         treads1 = []
>> >         while i < tread1Count:
>> >             treads1.append(PopulatorThread(i, tread1Count, nfsQueue))
>> >             treads1[-1].start()
>> >             i += 1
>> >
>> >         # Wait for reader/populator threads
>> >         print "Waiting to join %d populator threads" % len(treads1)
>> >         i = 0
>> >         for tread in treads1:
>> >             "Waiting to join a populator thread %d" % i
>> >             tread.join()
>> >             i += 1
>> >
>> >         #Add one sentinel value to queue for each write thread
>> >         print "Adding sentinel values to end of queue"
>> >         for tread in treads2:
>> >             nfsQueue.put(None)
>> >
>> >         # Wait for consumer/writer threads
>> >         print "Waiting to join consumer/writer threads"
>> >         for tread in treads2:
>> >             print "Waiting on a consumer/writer"
>> >             tread.join()
>> >
>> >         # Wait for Queue
>> >         print "Waiting to join queue with %d items" % nfsQueue.qsize()
>> >         nfsQueue.join()
>> >         print "Queue has been joined"
>> >
>> >
>> >     if __name__ == '__main__':
>> >         main(sys.argv)
>> >
>> >
>> > I have simplified the database implementation somewhat to save space.
>> >
>> > When I run the code, the populator and consumer threads seem to reach
>> > the end, since I get the "Made it to finally in ..." messages.
>> > I get the "Waiting to join n populator threads" message, but no
>> > "Waiting to join a populator thread n" messages.
>> > I get the "Waiting to join consumer/writer threads" message as well as
>> > each of the "Waiting on a consumer/writer" messages I expect.
>> > I get the "Waiting to join queue with 0 items" message I expect, but
>> > not the "Queue has been joined" message; apparently the program is
>> > blocking while waiting for the queue, and it never terminates.
>> >
>> > I suspect I have my thread initializations or thread joinings in the
>> > wrong order somehow, but I have little experience with concurrent
>> > programming, so my intuitions about how to do things aren't well
>> > developed. I find plenty of Python/Jython examples of queues populated
>> > by while loops and read by threads, but none so far about queues
>> > populated by one set of threads and read by a different set.
>> >
>> > The populator and consumer threads appear to finish, but I'm puzzled
>> > by the fact that I don't see the "Waiting to join a populator thread
>> > n" messages, even though the array containing the threads is populated
>> > and the program continues beyond that point.
>> >
>> > The program seems to be blocking finally waiting for the Queue object
>> > to terminate.
>> >
>> > Thanks to any who have suggestions and lessons for me!
>> > Chuck
>>
>>
>> ------------------------------------------------------------------------------
>> "Accelerate Dev Cycles with Automated Cross-Browser Testing - For FREE
>> Instantly run your Selenium tests across 300+ browser/OS combos.
>> Get unparalleled scalability from the best Selenium testing platform
>> available
>> Simple to use. Nothing to install. Get started now for free."
>> http://p.sf.net/sfu/SauceLabs
>> _______________________________________________
>> Jython-users mailing list
>> [hidden email]
>> https://lists.sourceforge.net/lists/listinfo/jython-users
>>
>

------------------------------------------------------------------------------
"Accelerate Dev Cycles with Automated Cross-Browser Testing - For FREE
Instantly run your Selenium tests across 300+ browser/OS combos.
Get unparalleled scalability from the best Selenium testing platform available
Simple to use. Nothing to install. Get started now for free."
http://p.sf.net/sfu/SauceLabs
_______________________________________________
Jython-users mailing list
[hidden email]
https://lists.sourceforge.net/lists/listinfo/jython-users