Examples¶
[1]:
import apalis
import time
Basics¶
Single Handler¶
[2]:
class A:
def __init__(self, y):
self.y = y
def expensive(self, x=1):
cum = 0
for i in range(10 ** 5):
cum += i
return x * self.y
def fast(self, x=1):
cum = 0
for i in range(x):
cum += i
return 0
def cheap(self, x=1):
return x * self.y
To send an object to another process.
[3]:
obj = apalis.Handler(A(2))
[4]:
%%time
token = obj.expensive(5)
print(token()) # Calling the token yields the result of the operation
10
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 14.4 ms
Several Handlers¶
[5]:
objsH = [apalis.Handler(A(_)) for _ in range(16)]
[6]:
%%time
tokens = [obj.expensive(5) for obj in objsH]
print(apalis.get(tokens)) # Gets the results of the operations.
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75]
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 25.2 ms
This is equivalen to:
[7]:
print([token() for token in tokens])
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75]
Extra Features¶
Set Attribute¶
[8]:
obj = apalis.Handler(A(2))
obj.expensive(1)()
[8]:
2
[9]:
obj.y = 5
obj.expensive(1)()
[9]:
5
Also new attributes can be set.
[10]:
obj.a = A(1)
[11]:
obj.a.expensive()()
[11]:
1
An attribute from an attribute can be set.
[12]:
obj.a.y = 3
obj.a.expensive()()
[12]:
3
Initialize the object directly in the child process¶
[14]:
@apalis.RemoteClass
class G:
def __init__(self, y):
# Expensive initialization
self.y = 0
for i in range(y):
self.y += i
[15]:
g = G(5)
g
[15]:
<apalis.core.Handler at 0x7f2e2f2e82b0>
The Group Handler¶
With the GroupHandler a list of objects can be parallelized on a given number of processes.
[16]:
objs = [A(_) for _ in range(64)]
objsGH = apalis.GroupHandler(objs, threads=16)
The ideal way to use the GroupHandler is to first obtain all the tasks that the objs need to perform and then run them using run(). Executing a function will now return a task.
[17]:
objsGH[2].cheap(5)
[17]:
{'i': 2, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run'}
All the tasks can be executed by multiple_run().
[18]:
tasks = [objsGH[i].cheap(5) for i in range(2)]
token = objsGH.multiple_run(tasks)
print(tasks)
token()
[{'i': 0, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run', 'task_id': 0, 'item_number': 0}, {'i': 1, 'name': 'cheap', 'args': (5,), 'kwargs': {}, 'mode': 'run', 'task_id': 0, 'item_number': 1}]
[18]:
[0, 5]
Let us time the function:
[19]:
%%timeit
objsGH.multiple_run([obj.cheap(5) for obj in objsGH])()
455 µs ± 14.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
If more speed is needed the task can be created manually:
[20]:
%%timeit
objsGH.multiple_run([{'i': i, 'name': "cheap", 'args': (5,), "mode":"run"} for i, obj in enumerate(objs)])()
412 µs ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Comparing the speed with the normal Handler one can see that the GroupHandler is faster:
[21]:
objsH = [apalis.Handler(obj) for i, obj in enumerate(objs)]
[22]:
%%timeit
apalis.get([obj.cheap(5) for obj in objsH])
1.79 ms ± 53.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Note that several tasks can also be send to one Handler with the multiple_run syntax:
[23]:
objsH[0].multiple_run([{'name': "cheap", 'args': (i,), "mode":"run"} for i in range(10)])()
[23]:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
or
run¶
A bit more speed can be achieved by not generating a token and directly waiting for the respons of the child processes:
[24]:
%%timeit
objsGH.run([{'i': i, 'name': "cheap", 'args': (5,), "mode":"run"} for i, obj in enumerate(objs)])
358 µs ± 9.37 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
same syntax is also valid for the Handler:
[25]:
objsH[0].run({'name': "cheap", 'args': (1,), "mode":"run"})
[25]:
0
More Functionality¶
single_run¶
The function single_run will execute just one task. It works for both the GroupHandler and Handler.
[26]:
objsGH.single_run({'i':1, 'name': "cheap", 'args': (1,), "mode": "run"})()
[26]:
1
[27]:
objsH[1].single_run({'name': "cheap", 'args': (1,), "mode": "run"})()
[27]:
1
It is a bit faster than multiple run:
[28]:
%%timeit
objsGH.single_run({'i':1, 'name': "cheap", 'args': (1,), "mode": "run"})()
43.3 µs ± 1.33 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
[29]:
%%timeit
objsGH.multiple_run([{'i':1, 'name': "cheap", 'args': (1,), "mode": "run"}])()
44.5 µs ± 955 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
eval, exec, copy, get_attr, set_attr¶
Evaluate the code. Equivalent to eval().
[30]:
objsH[1].single_run({'code': "self.cheap(1) * 5", "mode": "eval"})()
[30]:
5
Execute the code. Equivalent to exec().
[31]:
objsH[1].single_run({'code': "self.a = self.cheap(1) * 5", "mode": "exec"})()
objsH[1].a.get()()
[31]:
5
Returns a copy of the object. Before the copy is returned the code is executed. This is usefull when your object has unpickable objects as attributes as you can delete them.
[32]:
class B:
def __init__(self):
self.lambda_function = lambda x: x
obj = apalis.Handler(B())
[33]:
b_copy = obj.single_run({'code': "del self_copy.lambda_function", "mode": "copy"})()
Set an attribute. Equivalent to obj.x = 5.
[34]:
objsH[1].single_run({'name': "y", "value": 5, "mode": "set_attr"})()
Get an attribute. Equivalent to obj.x.get().
[35]:
objsH[1].single_run({'name': "y", "mode": "get_attr"})()
[35]:
5
Initialize directly in the child process (Group Handler)¶
The first argument is the class that will be sent to the child process. The second a list of parameters that will be passed to instantiate the class. They should be in the format [args, kwargs].
[36]:
classes = [A for i in range(16)]
params = [[(i,), {}] for i in range(16)]
gh = apalis.GroupHandler(classes, params=params)
gh
[36]:
<apalis.core.GroupHandler at 0x7f2e2f2a3a30>
[37]:
gh.single_run(gh[2].cheap(1))()
[37]:
2
[38]:
g
[38]:
<apalis.core.Handler at 0x7f2e2f2e82b0>
CPU Affinity¶
For fast work loads (ms) setting the CPU affinity of the processes can make things faster.
[39]:
objs = [A(_) for _ in range(16)]
objsGH = apalis.GroupHandler(objs, threads=16, affinity=False)
objsGH_affinity = apalis.GroupHandler(objs, threads=16, affinity=True)
[40]:
%%timeit
objsGH.multiple_run([{'i': i, 'name': "fast", 'args': (10 ** 4,), "mode":"run"} for i in range(16)])()
1.61 ms ± 25.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
[41]:
%%timeit
objsGH_affinity.multiple_run([{'i': i, 'name': "fast", 'args': (10**4,), "mode":"run"} for i in range(16)])()
1.09 ms ± 33.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Sending Large Objects with Plasma¶
[42]:
import numpy as np
Creates a plasma object store.
[43]:
apalis.init_plasma(mem=2000)
/home/danielalcalde/anaconda3/envs/python3.8/bin/plasma_store -m 2000000000 -s /tmp/plasma_auBfeZvrVU
[43]:
<pyarrow._plasma.PlasmaClient at 0x7f2e2d659670>
Store in plasma:
[44]:
oid = apalis.put(np.zeros(10))
oid
[44]:
ObjectID(92d6aacd66fa94adb1f6bf85be06d54be721a670)
Retrive it from plasma
[45]:
apalis.plasma_get(oid)
[45]:
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])
Let’s test it out.
[46]:
class C:
def __init__(self):
self.v = np.random.randn(3000)
def multi(self, m):
return m.dot(self.v)
[47]:
objs = [apalis.Handler(C()) for _ in range(16)]
objs2 = [C() for _ in range(16)]
Without using Plasma:
[48]:
%%timeit
m = np.random.randn(3000, 3000)
apalis.get([obj.multi(m) for obj in objs])
3.72 s ± 74 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
With Plasma:
[49]:
%%timeit
m = np.random.randn(3000, 3000)
m_id = apalis.put(m)
apalis.get([obj.multi(m_id) for obj in objs])
691 ms ± 32.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
With shared arrays:
[50]:
%%timeit
m = apalis.SharedArray(np.random.randn(3000, 3000))
apalis.get([obj.multi(m) for obj in objs])
m.unlink()
732 ms ± 43.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)