Apalis

Open in Github

Examples

[1]:
import apalis
import time

Basics

Single Handler

[2]:
class A:
    def __init__(self, y):
        self.y = y

    def expensive(self, x=1):
        cum = 0
        for i in range(10 ** 5):
            cum += i
        return x * self.y

    def fast(self, x=1):
        cum = 0
        for i in range(x):
            cum += i
        return 0

    def cheap(self, x=1):
        return x * self.y

To send an object to another process.

[3]:
obj = apalis.Handler(A(2))
[4]:
%%time
token = obj.expensive(5)
print(token()) # Calling the token yields the result of the operation
10
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 14.4 ms

Several Handlers

[5]:
objsH = [apalis.Handler(A(_)) for _ in range(16)]
[6]:
%%time
tokens = [obj.expensive(5) for obj in objsH]
print(apalis.get(tokens)) # Gets the results of the operations.
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75]
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 25.2 ms

This is equivalen to:

[7]:
print([token() for token in tokens])
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75]

Extra Features

Set Attribute

[8]:
obj = apalis.Handler(A(2))
obj.expensive(1)()
[8]:
2
[9]:
obj.y = 5
obj.expensive(1)()
[9]:
5

Also new attributes can be set.

[10]:
obj.a = A(1)
[11]:
obj.a.expensive()()
[11]:
1

An attribute from an attribute can be set.

[12]:
obj.a.y = 3
obj.a.expensive()()
[12]:
3

Get Attribute

Use the get() function to get the value of an attribute

[13]:
obj.y.get()()
[13]:
5

Initialize the object directly in the child process

[14]:
@apalis.RemoteClass
class G:
    def __init__(self, y):
        # Expensive initialization
        self.y = 0
        for i in range(y):
            self.y += i
[15]:
g = G(5)
g
[15]:
<apalis.core.Handler at 0x7f2e2f2e82b0>

The Group Handler

With the GroupHandler a list of objects can be parallelized on a given number of processes.

[16]:
objs = [A(_) for _ in range(64)]
objsGH = apalis.GroupHandler(objs, threads=16)

The ideal way to use the GroupHandler is to first obtain all the tasks that the objs need to perform and then run them using run(). Executing a function will now return a task.

[17]:
objsGH[2].cheap(5)
[17]:
{'i': 2, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run'}

All the tasks can be executed by multiple_run().

[18]:
tasks = [objsGH[i].cheap(5) for i in range(2)]
token = objsGH.multiple_run(tasks)
print(tasks)
token()
[{'i': 0, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run', 'task_id': 0, 'item_number': 0}, {'i': 1, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run', 'task_id': 0, 'item_number': 1}]
[18]:
[0, 5]

Let us time the function:

[19]:
%%timeit
objsGH.multiple_run([obj.cheap(5) for obj in objsGH])()
455 µs ± 14.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

If more speed is needed the task can be created manually:

[20]:
%%timeit
objsGH.multiple_run([{'i': i, 'name': "cheap", 'args': (5,), "mode":"run"} for i, obj in enumerate(objs)])()
412 µs ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

Comparing the speed with the normal Handler one can see that the GroupHandler is faster:

[21]:
objsH = [apalis.Handler(obj) for i, obj in enumerate(objs)]
[22]:
%%timeit
apalis.get([obj.cheap(5) for obj in objsH])
1.79 ms ± 53.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

Note that several tasks can also be send to one Handler with the multiple_run syntax:

[23]:
objsH[0].multiple_run([{'name': "cheap", 'args': (i,), "mode":"run"} for i in range(10)])()
[23]:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

or

run

A bit more speed can be achieved by not generating a token and directly waiting for the respons of the child processes:

[24]:
%%timeit
objsGH.run([{'i': i, 'name': "cheap", 'args': (5,), "mode":"run"} for i, obj in enumerate(objs)])
358 µs ± 9.37 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

same syntax is also valid for the Handler:

[25]:
objsH[0].run({'name': "cheap", 'args': (1,), "mode":"run"})
[25]:
0

More Functionality

single_run

The function single_run will execute just one task. It works for both the GroupHandler and Handler.

[26]:
objsGH.single_run({'i':1, 'name': "cheap", 'args': (1,), "mode": "run"})()
[26]:
1
[27]:
objsH[1].single_run({'name': "cheap", 'args': (1,), "mode": "run"})()
[27]:
1

It is a bit faster than multiple run:

[28]:
%%timeit
objsGH.single_run({'i':1, 'name': "cheap", 'args': (1,), "mode": "run"})()
43.3 µs ± 1.33 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
[29]:
%%timeit
objsGH.multiple_run([{'i':1, 'name': "cheap", 'args': (1,), "mode": "run"}])()
44.5 µs ± 955 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

eval, exec, copy, get_attr, set_attr

Evaluate the code. Equivalent to eval().

[30]:
objsH[1].single_run({'code': "self.cheap(1) * 5", "mode": "eval"})()
[30]:
5

Execute the code. Equivalent to exec().

[31]:
objsH[1].single_run({'code': "self.a = self.cheap(1) * 5", "mode": "exec"})()
objsH[1].a.get()()
[31]:
5

Returns a copy of the object. Before the copy is returned the code is executed. This is usefull when your object has unpickable objects as attributes as you can delete them.

[32]:
class B:
    def __init__(self):
        self.lambda_function = lambda x: x

obj = apalis.Handler(B())
[33]:
b_copy = obj.single_run({'code': "del self_copy.lambda_function", "mode": "copy"})()

Set an attribute. Equivalent to obj.x = 5.

[34]:
objsH[1].single_run({'name': "y", "value": 5, "mode": "set_attr"})()

Get an attribute. Equivalent to obj.x.get().

[35]:
objsH[1].single_run({'name': "y", "mode": "get_attr"})()
[35]:
5

Initialize directly in the child process (Group Handler)

The first argument is the class that will be sent to the child process. The second a list of parameters that will be passed to instantiate the class. They should be in the format [args, kwargs].

[36]:
classes = [A for i in range(16)]
params = [[(i,), {}] for i in range(16)]
gh = apalis.GroupHandler(classes, params=params)
gh
[36]:
<apalis.core.GroupHandler at 0x7f2e2f2a3a30>
[37]:
gh.single_run(gh[2].cheap(1))()
[37]:
2
[38]:
g
[38]:
<apalis.core.Handler at 0x7f2e2f2e82b0>

CPU Affinity

For fast work loads (t<1ms) setting the CPU affinity of the processes can make things faster.

[39]:
objs = [A(_) for _ in range(16)]

objsGH = apalis.GroupHandler(objs, threads=16, affinity=False)
objsGH_affinity = apalis.GroupHandler(objs, threads=16, affinity=True)
[40]:
%%timeit
objsGH.multiple_run([{'i': i, 'name': "fast", 'args': (10 ** 4,), "mode":"run"} for i in range(16)])()
1.61 ms ± 25.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
[41]:
%%timeit
objsGH_affinity.multiple_run([{'i': i, 'name': "fast", 'args': (10**4,), "mode":"run"} for i in range(16)])()
1.09 ms ± 33.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

Sending Large Objects with Plasma

[42]:
import numpy as np

Creates a plasma object store.

[43]:
apalis.init_plasma(mem=2000)
/home/danielalcalde/anaconda3/envs/python3.8/bin/plasma_store -m 2000000000 -s /tmp/plasma_auBfeZvrVU
[43]:
<pyarrow._plasma.PlasmaClient at 0x7f2e2d659670>

Store in plasma:

[44]:
oid = apalis.put(np.zeros(10))
oid
[44]:
ObjectID(92d6aacd66fa94adb1f6bf85be06d54be721a670)

Retrive it from plasma

[45]:
apalis.plasma_get(oid)
[45]:
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

Let’s test it out.

[46]:
class C:
    def __init__(self):
        self.v = np.random.randn(3000)

    def multi(self, m):
        return m.dot(self.v)
[47]:
objs = [apalis.Handler(C()) for _ in range(16)]
objs2 = [C() for _ in range(16)]

Without using Plasma:

[48]:
%%timeit
m = np.random.randn(3000, 3000)
apalis.get([obj.multi(m) for obj in objs])
3.72 s ± 74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

With Plasma:

[49]:
%%timeit
m = np.random.randn(3000, 3000)
m_id = apalis.put(m)
apalis.get([obj.multi(m_id) for obj in objs])
691 ms ± 32.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

With shared arrays:

[50]:
%%timeit
m = apalis.SharedArray(np.random.randn(3000, 3000))
apalis.get([obj.multi(m) for obj in objs])
m.unlink()
732 ms ± 43.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Shared Arrays (Experimental)

[51]:
class C:
    def __init__(self, x):
        self.x = x

    def multi(self, m):
        m[0] = self.x
[52]:
obj = apalis.Handler(C(10))

Create mutable shared array:

[53]:
a = apalis.SharedArray(np.zeros(10))
a
[53]:
SharedArray([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

It gets modified in by multi. The modification gets shared:

[54]:
obj.multi(a)()
a
[54]:
SharedArray([10.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.])

To release the shared memory:

[55]:
a.unlink()