Catching exceptions with multi-processing

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Fabien
Folks,

I am developing a tool which works on individual entities (glaciers) and
do a lot of operations on them. There are many tasks to do, one after
each other, and each task follows the same interface:

def task_1(path_to_glacier_dir):
     open file1 in path_to_glacier_dir
     do stuff
     if dont_work:
         raise RuntimeError("didnt work")
     write file2 in path_to_glacier_dir

This way, the tasks can be run in parallel very easily:

import multiprocessing as mp
pool = mp.Pool(4)

dirs = [list_of_dirs]
pool.map(task1, dirs, chunksize=1)
pool.map(task2, dirs, chunksize=1)
pool.map(task3, dirs, chunksize=1)

... and so forth. I tested the tool for about a hundred glaciers but now
it has to run for thousands of them. There are going to be errors, some
of them are even expected for special outliers. What I would like the
tool to do is that in case of error, it writes the identifier of the
problematic glacier somewhere, the error encountered and more info if
possible. Because of multiprocessing, I can't write in a shared file, so
I thought that the individual processes should write a unique "error
file" in a dedicated directory.

What I don't know how to, however, is how to do this at minimal cost and
in a generic way for all tasks. Also, the task2 should not be run if
task1 threw an error. Sometimes (for debugging), I'd rather keep the
normal behavior of raising an error and stopping the program.

Do I have to wrap all tasks with a "try: exept:" block? How to switch
between behaviors? All the solutions I could think about look quite ugly
to me. And it seems that this is a general problem that someone cleverer
than me had solved before ;-)

Thanks,

Fabien








Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Andres Riancho
Fabien,

   My recommendation is that you should pass some extra arguments to the task:
    * A unique task id
    * A result multiprocessing.Queue

    When an exception is raised you put (unique_id, exception) to the
queue. When it succeeds you put (unique_id, None). In the main process
you consume the queue and do your error handling.

    Note that some exceptions can't be serialized, there is where
tblib [0] comes handy.

[0] https://pypi.python.org/pypi/tblib

Regards,

On Fri, Jun 19, 2015 at 11:01 AM, Fabien <fabien.maussion at gmail.com> wrote:

> Folks,
>
> I am developing a tool which works on individual entities (glaciers) and do
> a lot of operations on them. There are many tasks to do, one after each
> other, and each task follows the same interface:
>
> def task_1(path_to_glacier_dir):
>     open file1 in path_to_glacier_dir
>     do stuff
>     if dont_work:
>         raise RuntimeError("didnt work")
>     write file2 in path_to_glacier_dir
>
> This way, the tasks can be run in parallel very easily:
>
> import multiprocessing as mp
> pool = mp.Pool(4)
>
> dirs = [list_of_dirs]
> pool.map(task1, dirs, chunksize=1)
> pool.map(task2, dirs, chunksize=1)
> pool.map(task3, dirs, chunksize=1)
>
> ... and so forth. I tested the tool for about a hundred glaciers but now it
> has to run for thousands of them. There are going to be errors, some of them
> are even expected for special outliers. What I would like the tool to do is
> that in case of error, it writes the identifier of the problematic glacier
> somewhere, the error encountered and more info if possible. Because of
> multiprocessing, I can't write in a shared file, so I thought that the
> individual processes should write a unique "error file" in a dedicated
> directory.
>
> What I don't know how to, however, is how to do this at minimal cost and in
> a generic way for all tasks. Also, the task2 should not be run if task1
> threw an error. Sometimes (for debugging), I'd rather keep the normal
> behavior of raising an error and stopping the program.
>
> Do I have to wrap all tasks with a "try: exept:" block? How to switch
> between behaviors? All the solutions I could think about look quite ugly to
> me. And it seems that this is a general problem that someone cleverer than
> me had solved before ;-)
>
> Thanks,
>
> Fabien
>
>
>
>
>
>
>
> --
> https://mail.python.org/mailman/listinfo/python-list



--
Andr?s Riancho
Project Leader at w3af - http://w3af.org/
Web Application Attack and Audit Framework
Twitter: @w3af
GPG: 0x93C344F3

Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Jean-Michel Pichavant
In reply to this post by Fabien
----- Original Message -----

> From: "Fabien" <fabien.maussion at gmail.com>
> To: python-list at python.org
> Sent: Friday, 19 June, 2015 4:01:02 PM
> Subject: Catching exceptions with multi-processing
>
> Folks,
>
> I am developing a tool which works on individual entities (glaciers)
> and
> do a lot of operations on them. There are many tasks to do, one after
> each other, and each task follows the same interface:
>
> def task_1(path_to_glacier_dir):
>      open file1 in path_to_glacier_dir
>      do stuff
>      if dont_work:
>          raise RuntimeError("didnt work")
>      write file2 in path_to_glacier_dir
>
> This way, the tasks can be run in parallel very easily:
>
> import multiprocessing as mp
> pool = mp.Pool(4)
>
> dirs = [list_of_dirs]
> pool.map(task1, dirs, chunksize=1)
> pool.map(task2, dirs, chunksize=1)
> pool.map(task3, dirs, chunksize=1)
>
> ... and so forth. I tested the tool for about a hundred glaciers but
> now
> it has to run for thousands of them. There are going to be errors,
> some
> of them are even expected for special outliers. What I would like the
> tool to do is that in case of error, it writes the identifier of the
> problematic glacier somewhere, the error encountered and more info if
> possible. Because of multiprocessing, I can't write in a shared file,
> so
> I thought that the individual processes should write a unique "error
> file" in a dedicated directory.
>
> What I don't know how to, however, is how to do this at minimal cost
> and
> in a generic way for all tasks. Also, the task2 should not be run if
> task1 threw an error. Sometimes (for debugging), I'd rather keep the
> normal behavior of raising an error and stopping the program.
>
> Do I have to wrap all tasks with a "try: exept:" block? How to switch
> between behaviors? All the solutions I could think about look quite
> ugly
> to me. And it seems that this is a general problem that someone
> cleverer
> than me had solved before ;-)
>
> Thanks,
>
> Fabien

https://azylstra.net/blog/post/exception-handling-with-python-processes

shows how subprocesses can send their exception to the main process.



-- IMPORTANT NOTICE:

The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.

Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Oscar Benjamin-2
In reply to this post by Fabien
On 19 June 2015 at 15:01, Fabien <fabien.maussion at gmail.com> wrote:

> Folks,
>
> I am developing a tool which works on individual entities (glaciers) and do
> a lot of operations on them. There are many tasks to do, one after each
> other, and each task follows the same interface:
>
> def task_1(path_to_glacier_dir):
>     open file1 in path_to_glacier_dir
>     do stuff
>     if dont_work:
>         raise RuntimeError("didnt work")
>     write file2 in path_to_glacier_dir
>
> This way, the tasks can be run in parallel very easily:
>
> import multiprocessing as mp
> pool = mp.Pool(4)
>
> dirs = [list_of_dirs]
> pool.map(task1, dirs, chunksize=1)
> pool.map(task2, dirs, chunksize=1)
> pool.map(task3, dirs, chunksize=1)
>
> ... and so forth. I tested the tool for about a hundred glaciers but now it
> has to run for thousands of them. There are going to be errors, some of them
> are even expected for special outliers. What I would like the tool to do is
> that in case of error, it writes the identifier of the problematic glacier
> somewhere, the error encountered and more info if possible. Because of
> multiprocessing, I can't write in a shared file, so I thought that the
> individual processes should write a unique "error file" in a dedicated
> directory.
>
> What I don't know how to, however, is how to do this at minimal cost and in
> a generic way for all tasks. Also, the task2 should not be run if task1
> threw an error. Sometimes (for debugging), I'd rather keep the normal
> behavior of raising an error and stopping the program.
>
> Do I have to wrap all tasks with a "try: exept:" block? How to switch
> between behaviors? All the solutions I could think about look quite ugly to
> me. And it seems that this is a general problem that someone cleverer than
> me had solved before ;-)

A simple way to approach this could be something like:

#!/usr/bin/env python3

import math
import multiprocessing

def sqrt(x):
    if x < 0:
        return 'error', x
    else:
        return 'success', math.sqrt(x)

if __name__ == "__main__":
    numbers = [1, 2, 3, -1, -3]
    pool = multiprocessing.Pool()
    for ret, val in pool.imap(sqrt, numbers):
        if ret == 'error':
            raise ValueError(val)
        print(val)

Just replace the raise statement with whatever you want to do (write
to a file etc). Since all errors are handled in the master process
there are no issues with writing to a file.

--
Oscar

Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Jean-Michel Pichavant
----- Original Message -----

> From: "Oscar Benjamin" <oscar.j.benjamin at gmail.com>
> A simple way to approach this could be something like:
>
> #!/usr/bin/env python3
>
> import math
> import multiprocessing
>
> def sqrt(x):
>     if x < 0:
>         return 'error', x
>     else:
>         return 'success', math.sqrt(x)
>
> if __name__ == "__main__":
>     numbers = [1, 2, 3, -1, -3]
>     pool = multiprocessing.Pool()
>     for ret, val in pool.imap(sqrt, numbers):
>         if ret == 'error':
>             raise ValueError(val)
>         print(val)
>
> Just replace the raise statement with whatever you want to do (write
> to a file etc). Since all errors are handled in the master process
> there are no issues with writing to a file.
>
> --
> Oscar

The main problem with this approach is that it does not handle unexpected exceptions within subprocesses.

JM


-- IMPORTANT NOTICE:

The contents of this email and any attachments are confidential and may also be privileged. If you are not the intended recipient, please notify the sender immediately and do not disclose the contents to any other person, use it for any purpose, or store or copy the information in any medium. Thank you.

Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Steven D'Aprano-8
In reply to this post by Fabien
On Sat, 20 Jun 2015 12:01 am, Fabien wrote:

> Folks,
>
> I am developing a tool which works on individual entities (glaciers) and
> do a lot of operations on them. There are many tasks to do, one after
> each other, and each task follows the same interface:

I'm afraid your description is contradictory. Here you say the tasks run one
after another, but then you say:

> This way, the tasks can be run in parallel very easily:

and then later still you contradict this:

> Also, the task2 should not be run if task1 threw an error.


If task2 relies on task1, then you *cannot* run them in parallel. You have
to run them one after the other, sequentially.


You also ask:

> There are going to be errors, some
> of them are even expected for special outliers. What I would like the
> tool to do is that in case of error, it writes the identifier of the
> problematic glacier somewhere, the error encountered and more info if
> possible. Because of multiprocessing, I can't write in a shared file, so
> I thought that the individual processes should write a unique "error
> file" in a dedicated directory.

The documentation for the logging module has examples of using
multiprocessing write to a single log file from multiple processes. It's a
bit complicated, since *directly* writing to a single log from multiple
processes is not supported, but it is possible.

https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes


Or if you are on a Unix or Linux system, you can log to syslog and let
syslog handle it.

Since your sample code appears to have a lot of file I/O, it may be that you
can use threads rather than multiprocessing. That would allow all the
threads to communicate with a single thread that handles logging.

Or use a lock file:

http://stackoverflow.com/questions/1444790/python-module-for-creating-pid-based-lockfile



--
Steven


Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Fabien
On 06/19/2015 05:41 PM, Steven D'Aprano wrote:

> On Sat, 20 Jun 2015 12:01 am, Fabien wrote:
>
>> >Folks,
>> >
>> >I am developing a tool which works on individual entities (glaciers) and
>> >do a lot of operations on them. There are many tasks to do, one after
>> >each other, and each task follows the same interface:
> I'm afraid your description is contradictory. Here you say the tasks run one
> after another, but then you say:
>
>> >This way, the tasks can be run in parallel very easily:
> and then later still you contradict this:
>
>> >Also, the task2 should not be run if task1 threw an error.
>
> If task2 relies on task1, then you*cannot*  run them in parallel. You have
> to run them one after the other, sequentially.

Hi Steve,

I meant: "for several glaciers in parallel" as shown by the code snippet:

import multiprocessing as mp
pool = mp.Pool(4)

dirs = [list_of_dirs]
pool.map(task1, dirs, chunksize=1)
pool.map(task2, dirs, chunksize=1)
pool.map(task3, dirs, chunksize=1)

which should be changed to something like (after some of the responses):

dirs = [list_of_dirs]
pool.map(task1, dirs, ...)
# handle exceptions
dirs_reduced = [dirs that did not fail]
pool.map(task2, dirs_reduced, ...)

this way the tasks are run sequentially for each glacier but in parallel
over a list of glaciers...

Fabien


Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Fabien
In reply to this post by Fabien
On 06/19/2015 04:25 PM, Andres Riancho wrote:

> Fabien,
>
>     My recommendation is that you should pass some extra arguments to the task:
>      * A unique task id
>      * A result multiprocessing.Queue
>
>      When an exception is raised you put (unique_id, exception) to the
> queue. When it succeeds you put (unique_id, None). In the main process
> you consume the queue and do your error handling.
>
>      Note that some exceptions can't be serialized, there is where
> tblib [0] comes handy.
>
> [0]https://pypi.python.org/pypi/tblib
>
> Regards,

Thanks, I wasn't aware of the multiprocessing.Queue workflow. It seems
like its going to require some changes in the actual code of the tasks
though. Did I get it right that I should stop raising exceptions then?

Something like:

def task_1(path, q):
     # Do stuffs
     if dont_work:
         q.put(RuntimeError("didnt work"))
        return
     # finished
     q.put(None)
     return


Fabien






Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Chris Angelico
In reply to this post by Steven D'Aprano-8
On Sat, Jun 20, 2015 at 1:41 AM, Steven D'Aprano <steve at pearwood.info> wrote:

> On Sat, 20 Jun 2015 12:01 am, Fabien wrote:
>
>> Folks,
>>
>> I am developing a tool which works on individual entities (glaciers) and
>> do a lot of operations on them. There are many tasks to do, one after
>> each other, and each task follows the same interface:
>
> I'm afraid your description is contradictory. Here you say the tasks run one
> after another, but then you say:
>
>> This way, the tasks can be run in parallel very easily:
>
> and then later still you contradict this:
>
>> Also, the task2 should not be run if task1 threw an error.
>
>
> If task2 relies on task1, then you *cannot* run them in parallel. You have
> to run them one after the other, sequentially.

AIUI what he's doing is all the subparts of task1 in parallel, then
all the subparts of task2:

pool.map(task1, dirs, chunksize=1)
pool.map(task2, dirs, chunksize=1)
pool.map(task3, dirs, chunksize=1)

task1 can be done on all of dirs in parallel, as no instance of task1
depends on any other instance of task1; but task2 should be started
only if all task1s finish successfully. OP, is this how it is? If not,
I apologize for the noise.

ChrisA

Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Cameron Simpson
In reply to this post by Fabien
On 19Jun2015 18:16, Fabien <fabien.maussion at gmail.com> wrote:

>On 06/19/2015 04:25 PM, Andres Riancho wrote:
>>    My recommendation is that you should pass some extra arguments to the task:
>>     * A unique task id
>>     * A result multiprocessing.Queue
>>
>>     When an exception is raised you put (unique_id, exception) to the
>>queue. When it succeeds you put (unique_id, None). In the main process
>>you consume the queue and do your error handling.
>>
>>     Note that some exceptions can't be serialized, there is where
>>tblib [0] comes handy.
>>
>>[0]https://pypi.python.org/pypi/tblib
>>
>>Regards,
>
>Thanks, I wasn't aware of the multiprocessing.Queue workflow. It seems
>like its going to require some changes in the actual code of the tasks
>though. Did I get it right that I should stop raising exceptions then?
>
>Something like:
>
>def task_1(path, q):
>    # Do stuffs
>    if dont_work:
>        q.put(RuntimeError("didnt work"))
> return
>    # finished
>    q.put(None)
>    return

I would keep your core logic Pythonic, raise exceptions. But I would wrap each
task in something to catch any Exception subclass and report back to the queue.  
Untested example:

  def subwrapper(q, callable, *args, **kwargs):
    try:
      q.put( ('COMPLETED', callable(*args, **kwargs)) )
    except Exception as e:
      q.put( ('FAILED', e, callable, args, kwargs) )

then dispatch tasks like this:

  pool.map(subwrapper, q, task1, dirs, chunksize=1)

and have a thread (or main program) collect things from the queue for logging
and other handling. Obviously you might return something more sophisticated
that my simple tuple above, but I'm sure you get the idea.

Cheers,
Cameron Simpson <cs at zip.com.au>

He's silly and he's ignorant, but he's got guts, and guts is enough.
        - Sgt. Hartmann

Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Fabien
In reply to this post by Steven D'Aprano-8
On 06/19/2015 10:58 PM, Chris Angelico wrote:

> AIUI what he's doing is all the subparts of task1 in parallel, then
> all the subparts of task2:
>
> pool.map(task1, dirs, chunksize=1)
> pool.map(task2, dirs, chunksize=1)
> pool.map(task3, dirs, chunksize=1)
>
> task1 can be done on all of dirs in parallel, as no instance of task1
> depends on any other instance of task1; but task2 should be started
> only if all task1s finish successfully. OP, is this how it is? If not,
> I apologize for the noise.

That's it! Thanks for clarifying, I might have trouble explaining myself
sometimes ;-)

Fabien

Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Fabien
In reply to this post by Fabien
On 06/20/2015 05:14 AM, Cameron Simpson wrote:

> I would keep your core logic Pythonic, raise exceptions. But I would
> wrap each task in something to catch any Exception subclass and report
> back to the queue. Untested example:
>
>   def subwrapper(q, callable, *args, **kwargs):
>     try:
>       q.put( ('COMPLETED', callable(*args, **kwargs)) )
>     except Exception as e:
>       q.put( ('FAILED', e, callable, args, kwargs) )
>
> then dispatch tasks like this:
>
>   pool.map(subwrapper, q, task1, dirs, chunksize=1)
>
> and have a thread (or main program) collect things from the queue for
> logging and other handling. Obviously you might return something more
> sophisticated that my simple tuple above, but I'm sure you get the idea.
>
> Cheers,
> Cameron Simpson

Perfect! Much more elegant and easier to implement on top of my existing
workflow based on raising exceptions.

thanks to all responses,

Fabien




Reply | Threaded
Open this post in threaded view
|

Catching exceptions with multi-processing

Paul Rubin-7
In reply to this post by Fabien
Fabien <fabien.maussion at gmail.com> writes:
> I am developing a tool which works on individual entities (glaciers)
> and do a lot of operations on them. There are many tasks to do, one
> after each other, and each task follows the same interface: ...

If most of the resources will be spent on computation and the
communications overhead is fairly low, the path of least resistance may
be to:

1) write a script that computes just one glacier (no multiprocessing)
2) write a control script that runs the glacier script through something
   like os.popen(), so normally it will collect an answer, but it can
   also notice if the glacier script crashes, or kill it from a timeout
   if it takes too long
3) Track the glacier tasks in an external queue server: I've used Redis
   (redis.io) for this, since it's simple and powerful, but there are
   other tools like 0mq that might be more precisely fitted.
4) The control script can read the queue server for tasks and update the
   queue server when results are ready

The advantages of this over multiprocessing are:

1) Redis is a TCP server which means you can spread your compute scripts
over multiple computers easily, getting more parallelism.  You can write
values into it as JSON strings if they are compound values that are not
too large.  Otherwise you probably have to use files, but can pass the
filenames through Redis.  You can connect new clients whenever you want
through the publish/subscribe interface, etc.

2) by using a simple control script you don't have to worry too much
about the many ways that the computation script might fail, you can
restart it, you can put the whole thing under your favorite supervision
daemon (cron, upstart, systemd or whatever) so it can restart
automatically even if your whole computer reboots, etc.  Redis can even
mirror itself to a failover server in real time if you think you need
that, plus it can checkpoint its state to disk.