[IPython-User] Problem using ipython-parallel in conjunction with custom zmq cache server

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[IPython-User] Problem using ipython-parallel in conjunction with custom zmq cache server

Adrien Gaidon
Hello,

tl;dr: Is there a caveat to using zmq and ipython-parallel in the same code that would explain weird blocking behaviour when doing `dview.map`?

I am using ipython-parallel to do `dview.map(func, tasks)` on our SGE-managed cluster to distribute an embarassingly parallel set of tasks (image and video processing), with something like (in pseudo-code):

def func(task):
    preprocessed_task = heavy_preprocessing(task)
    some_small_result = compute(preprocessed_task)
    return some_small_result

Everything works like a charm, even faster than MPI! (Thanks to the non-copying numpy array sends I guess?)

However, things get much slower when I use a "cached" version of `func` like:

def func_with_cache(task):
    preprocessed_task = read_preprocessing_from_cache(task)
    some_small_result = compute(preprocessed_task)
    return some_small_result

where `read_preprocessing_from_cache` hits an in-house zmq-based cache server (simple REQ-REP architecture over tcp on port 5555 of the same machine as the ipython-parallel client).

Using `func_with_cache` sequentially (built-in `map`) or using `multiprocessing.Pool.map` works perfectly (no blocking, fast as expected), but using it with `dview.map` makes things block for a looong time on every "packet" of tasks like so:
- each engine hits the cache and processes one task in the "normal" amount of time,
- all engines are idle for several dozens of seconds (as shown by top on an engine's host),
- the client is just waiting (not polling or anything, just time.sleep),
- after a while, each engine proceeds to its next task.

I also tried replacing `dview.map` with a "scatter-execute-gather" pattern and a load-balanced view map, but this did not change anything.

I don't know how to track down what is causing this, as profiling the code where the client and dview are just shows that almost all the time is spent in `time.sleep` (as expected), and I'm struggling to log what is happening on the engines (any hint on that appreciated btw).

I know this is a very specific problem, and there is not much one can do without seeing the actual code, but I hope that someone can at least "teach me how to fish" ;-)

FWIW, my vague intuition would be that there is some global internal state of zmq that is changed either by my `read_preprocessing_from_cache` code or by ipython-parallel (like a default timeout or something), but this is a shot in the dark made by a blind man with a toy gun so...

FYI, I am on Fedora Core 20 with Anaconda using:
python 2.7.8
pyzmq 14.3.1
ipython 2.1.0 (with default profile and options)

Any help on this would be very much appreciated!

By the way, thanks for all the hard work on ipython :-)

--- Adrien

_______________________________________________
IPython-User mailing list
[hidden email]
http://mail.scipy.org/mailman/listinfo/ipython-user
Reply | Threaded
Open this post in threaded view
|

Re: Problem using ipython-parallel in conjunction with custom zmq cache server

MinRK



On Sat, Aug 23, 2014 at 2:17 AM, Adrien Gaidon <[hidden email]> wrote:
Hello,

tl;dr: Is there a caveat to using zmq and ipython-parallel in the same code that would explain weird blocking behaviour when doing `dview.map`?

Not that I am aware of. Does yor code use the zmq IOLoop infrastructure, or plain sockets?

-MinRK
 

I am using ipython-parallel to do `dview.map(func, tasks)` on our SGE-managed cluster to distribute an embarassingly parallel set of tasks (image and video processing), with something like (in pseudo-code):

def func(task):
    preprocessed_task = heavy_preprocessing(task)
    some_small_result = compute(preprocessed_task)
    return some_small_result

Everything works like a charm, even faster than MPI! (Thanks to the non-copying numpy array sends I guess?)

However, things get much slower when I use a "cached" version of `func` like:

def func_with_cache(task):
    preprocessed_task = read_preprocessing_from_cache(task)
    some_small_result = compute(preprocessed_task)
    return some_small_result

where `read_preprocessing_from_cache` hits an in-house zmq-based cache server (simple REQ-REP architecture over tcp on port 5555 of the same machine as the ipython-parallel client).

Using `func_with_cache` sequentially (built-in `map`) or using `multiprocessing.Pool.map` works perfectly (no blocking, fast as expected), but using it with `dview.map` makes things block for a looong time on every "packet" of tasks like so:
- each engine hits the cache and processes one task in the "normal" amount of time,
- all engines are idle for several dozens of seconds (as shown by top on an engine's host),
- the client is just waiting (not polling or anything, just time.sleep),
- after a while, each engine proceeds to its next task.

That is strange. Can you identify what call the engines and/or controller are blocking in? Does your zmq code use IOLoop and/or threads?

-MinRK
 

I also tried replacing `dview.map` with a "scatter-execute-gather" pattern and a load-balanced view map, but this did not change anything.

I don't know how to track down what is causing this, as profiling the code where the client and dview are just shows that almost all the time is spent in `time.sleep` (as expected), and I'm struggling to log what is happening on the engines (any hint on that appreciated btw).

I know this is a very specific problem, and there is not much one can do without seeing the actual code, but I hope that someone can at least "teach me how to fish" ;-)

FWIW, my vague intuition would be that there is some global internal state of zmq that is changed either by my `read_preprocessing_from_cache` code or by ipython-parallel (like a default timeout or something), but this is a shot in the dark made by a blind man with a toy gun so...

FYI, I am on Fedora Core 20 with Anaconda using:
python 2.7.8
pyzmq 14.3.1
ipython 2.1.0 (with default profile and options)

Any help on this would be very much appreciated!

By the way, thanks for all the hard work on ipython :-)

--- Adrien

_______________________________________________
IPython-User mailing list
[hidden email]
http://mail.scipy.org/mailman/listinfo/ipython-user



_______________________________________________
IPython-User mailing list
[hidden email]
http://mail.scipy.org/mailman/listinfo/ipython-user
Reply | Threaded
Open this post in threaded view
|

Re: Problem using ipython-parallel in conjunction with custom zmq cache server

Adrien Gaidon
Hello,

Thanks for your reply MinRK!

I don't have access to our cluster right now, so I cannot redo experiments ATM, sorry. I can nonetheless add some elements.

1) My zmq code uses neither IOLoop nor threads (at least not explicitly or that I know of, but I'm new to zmq).

However, I use a global variable FC_CONTEXT = zmq.Context() to avoid creating a context upon each request. I do create and connect a socket for every request though, as sockets are not thread-safe in contrast to contexts. See below for some parts of the code.

Maybe this architecture and REQ-REP is not the best choice, but, as I said, I'm new to zmq, so it seemed like the simplest choice, which worked (almost) straightforwardly in sequential, parallel, or MPI mode. (I still have to play with the zmq.NOBLOCK flag.)

2) How can I use logging on the engines? I've tried a couple of things (including this answer of yours: http://stackoverflow.com/a/15146477), but to no avail. I also used the verbose mode of the ipcontroller and ipengines and checked the SGE job logs to hunt down the time.sleep block, but I could only understand what I put in my previous mail (blocking in "packets").

3) I also tried to dive in the ipython code, but this will require a bit more time for me to wrap my head around (it's quite big as illustrated by the beautiful snakefood visualization at the bottom of this post: http://grokcode.com/864/snakefooding-python-code-for-complexity-visualization/ ).

I'm also considering switching from ipython-parallel to Celery or the like, but I'd rather stick with ipython-parallel for now: it's just really easy, fast, and practical... when it works (this is only the first real issue I'm having with it though).

I copy-pasted (and edited) some of the code and related profiling below, in case it helps.

Thanks a lot for your help!

Best,

Adrien


The read_preprocessing_from_cache (actually named read_from_feat_cache) looks like:

import zmq
import numpy as np

class FeatCacheMiss(Exception):
    pass

FC_HOST = "some_hostname"
FC_PORT = 5555
FC_CONTEXT = zmq.Context()
# XXX contexts are thread-safe (sockets are not),
#     but is using a global context the issue here?

def read_from_feat_cache(img_name, bbs_hash):
    """Read features from the feature cache

    Parameters
    ----------
    img_name: str
        an image name (should be alpha-numeric)

    bbs_hash: str
        hash of the iterable on bounding boxes to get features from

    Returns
    -------
    feats: (n_bbs, dim) np.ndarray
        the extracted features of each bounding box (stored row-wise)

    """
    global FC_CONTEXT, FC_HOST, FC_PORT
    socket = FC_CONTEXT.socket(zmq.REQ)
    socket.connect('tcp://{}:{}'.format(FC_HOST, FC_PORT))
    socket.send_multipart(("GET", img_name, bbs_hash))
    reply_header = socket.recv()
    if reply_header == "FeatCacheMiss":
        raise FeatCacheMiss
    assert reply_header == "OK"
    dtype = socket.recv()
    shape_s = socket.recv()
    feats_msg = socket.recv(copy=False)
    # reconstruct numpy array
    shape = tuple(map(int, shape_s.strip('()').split(', ')))
    feats = np.frombuffer(buffer(feats_msg), dtype=dtype).reshape(shape)
    return feats

Some bits from the server-side code (re-arranged, inlined, and simplified):

import zmq
import tables as tb
# I use pytables for the disk-based cache behind the server

h = tb.open_file("a_feature_cache_file_path.h5", mode="a")

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://*:5555')

while True:
    req_header = socket.recv()
    img_name = socket.recv()
    bbs_hash = socket.recv()
    if req_header == "GET":
        # ("GET", img_name, bbs_hash) multi-part request
        # serve the feature if it exists
        try:
            img_group = getattr(h.root, img_name)
            feats = getattr(img_group, bbs_hash)[:]  # force load to get a numpy array
        except tb.NoSuchNodeError:
            # entry does not exist: notify cache miss
            socket.send("FeatCacheMiss")
            return
        # send the meta-data
        for _msg in ("OK", str(feats.dtype), str(feats.shape)):
            socket.send(_msg, flags=zmq.SNDMORE)
        # non-copying send of the data
        socket.send(feats, copy=False)
    # else: other request types than GET with more to socket.recv

And here is some profiling info when I benchmark a small set of 16 tasks on a 4-engine SGE-based ipcluster (the top-level function is called `detect_objects_multi`):

1) when not using the cache at all:

         1273389 function calls (1270265 primitive calls) in 29.730 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   29.732   29.732 _utils.py:45(detect_objects_multi)
    31/27    0.000    0.000   29.656    1.098 view.py:59(sync_results)
    31/27    0.000    0.000   29.654    1.098 view.py:46(save_ids)
       47    0.000    0.000   29.579    0.629 asyncresult.py:135(wait)
       47    0.216    0.005   29.574    0.629 client.py:1050(wait)
       23    0.000    0.000   29.572    1.286 asyncresult.py:103(get)
       17    0.000    0.000   24.543    1.444 <string>:1(execute)
       17    0.000    0.000   24.542    1.444 view.py:626(execute)
    19990   22.699    0.001   22.699    0.001 {time.sleep}
    20037    0.516    0.000    6.595    0.000 client.py:1033(spin)
       13    0.000    0.000    5.088    0.391 <string>:1(_really_apply)
       13    0.000    0.000    5.087    0.391 view.py:523(_really_apply)
        7    0.000    0.000    5.068    0.724 view.py:692(push)
   100501    0.765    0.000    5.032    0.000 session.py:684(recv)
   100501    0.266    0.000    3.994    0.000 socket.py:262(recv_multipart)
   102262    2.189    0.000    3.726    0.000 {method 'recv' of 'zmq.backend.cython.socket.Socket' objects}


2) When reading from the cache

         3570385 function calls (3567466 primitive calls) in 87.855 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   87.856   87.856 _utils.py:45(detect_objects_multi)
    31/27    0.000    0.000   87.810    3.252 view.py:59(sync_results)
    31/27    0.000    0.000   87.807    3.252 view.py:46(save_ids)
       46    0.000    0.000   87.745    1.908 asyncresult.py:135(wait)
       46    0.626    0.014   87.740    1.907 client.py:1050(wait)
       23    0.000    0.000   87.740    3.815 asyncresult.py:103(get)
       17    0.000    0.000   87.679    5.158 <string>:1(execute)
       17    0.000    0.000   87.677    5.157 view.py:626(execute)
    59937   67.662    0.001   67.662    0.001 {time.sleep}
    59983    1.533    0.000   19.272    0.000 client.py:1033(spin)
   300231    2.283    0.000   14.707    0.000 session.py:684(recv)
   300231    0.786    0.000   11.798    0.000 socket.py:262(recv_multipart)
   301992    6.467    0.000   11.011    0.000 {method 'recv' of 'zmq.backend.cython.socket.Socket' objects}
   119966    0.684    0.000    5.902    0.000 client.py:819(_flush_results)
    59986    0.564    0.000    4.741    0.000 client.py:804(_flush_notifications)
   299942    3.534    0.000    4.544    0.000 error.py:24(__init__)
    60007    0.376    0.000    3.188    0.000 client.py:858(_flush_iopub)
    59983    0.395    0.000    3.024    0.000 client.py:853(_flush_ignored_hub_replies



2014-08-27 0:05 GMT+02:00 MinRK <[hidden email]>:



On Sat, Aug 23, 2014 at 2:17 AM, Adrien Gaidon <[hidden email]> wrote:
Hello,

tl;dr: Is there a caveat to using zmq and ipython-parallel in the same code that would explain weird blocking behaviour when doing `dview.map`?

Not that I am aware of. Does yor code use the zmq IOLoop infrastructure, or plain sockets?

-MinRK
 

I am using ipython-parallel to do `dview.map(func, tasks)` on our SGE-managed cluster to distribute an embarassingly parallel set of tasks (image and video processing), with something like (in pseudo-code):

def func(task):
    preprocessed_task = heavy_preprocessing(task)
    some_small_result = compute(preprocessed_task)
    return some_small_result

Everything works like a charm, even faster than MPI! (Thanks to the non-copying numpy array sends I guess?)

However, things get much slower when I use a "cached" version of `func` like:

def func_with_cache(task):
    preprocessed_task = read_preprocessing_from_cache(task)
    some_small_result = compute(preprocessed_task)
    return some_small_result

where `read_preprocessing_from_cache` hits an in-house zmq-based cache server (simple REQ-REP architecture over tcp on port 5555 of the same machine as the ipython-parallel client).

Using `func_with_cache` sequentially (built-in `map`) or using `multiprocessing.Pool.map` works perfectly (no blocking, fast as expected), but using it with `dview.map` makes things block for a looong time on every "packet" of tasks like so:
- each engine hits the cache and processes one task in the "normal" amount of time,
- all engines are idle for several dozens of seconds (as shown by top on an engine's host),
- the client is just waiting (not polling or anything, just time.sleep),
- after a while, each engine proceeds to its next task.

That is strange. Can you identify what call the engines and/or controller are blocking in? Does your zmq code use IOLoop and/or threads?

-MinRK
 

I also tried replacing `dview.map` with a "scatter-execute-gather" pattern and a load-balanced view map, but this did not change anything.

I don't know how to track down what is causing this, as profiling the code where the client and dview are just shows that almost all the time is spent in `time.sleep` (as expected), and I'm struggling to log what is happening on the engines (any hint on that appreciated btw).

I know this is a very specific problem, and there is not much one can do without seeing the actual code, but I hope that someone can at least "teach me how to fish" ;-)

FWIW, my vague intuition would be that there is some global internal state of zmq that is changed either by my `read_preprocessing_from_cache` code or by ipython-parallel (like a default timeout or something), but this is a shot in the dark made by a blind man with a toy gun so...

FYI, I am on Fedora Core 20 with Anaconda using:
python 2.7.8
pyzmq 14.3.1
ipython 2.1.0 (with default profile and options)

Any help on this would be very much appreciated!

By the way, thanks for all the hard work on ipython :-)

--- Adrien

_______________________________________________
IPython-User mailing list
[hidden email]
http://mail.scipy.org/mailman/listinfo/ipython-user



_______________________________________________
IPython-User mailing list
[hidden email]
http://mail.scipy.org/mailman/listinfo/ipython-user



_______________________________________________
IPython-User mailing list
[hidden email]
http://mail.scipy.org/mailman/listinfo/ipython-user