Python Threading

We will not go over threading basics, such as a process and a thread of a process instead we will specifically talk about how Python can achieve threading firstly the basics and then using a pool.

Basics



In Python you can create a simple thread in 2 basic ways either through creating a thread and give it a task or by inheriting from Thread.
The Thread class has one method namely run that does all the work. So all you really need to do in the simple case is create a Thread object give it a task and then call start().
You can not restart a thread once you have called start and the semantics here are similar to Java
A simple example starting a Thread


from threading import Thread

def task(msg):
print(f'task {msg}')

#args is tuple
thread=Thread(target=task, args=(f'Some string',) )
thread.start()
thread.join()


We simply created a thread and sent some args to it and then did start()
We did join() in the main thread so that it will wait for the task thread to finish and it will join in the back of it.

If we wanted to run this via inheritance the code is similar but we create a class inherit from Thread and then send it some args



import threading
from threading import Thread

class MyThread(Thread):
#constructor
def __init__(self, value):
# execute the base constructor
Thread.__init__(self)
# store the value
self.value = value

#override run
def run(self):
print(f'mythread task {self.value}')


# custom exception hook
def custom_hook(args):
# report the failure
print(f'Thread failed: {args.exc_value}')

# set the exception hook
threading.excepthook = custom_hook
thread=MyThread(
f'value data')
thread.start()
thread.join()




Here we created a class MyThread with a constructor to take 1 arg and then we created a simple custom_hook method in case we had exceptions in the thread and gave it the custom_hook method.
We then started the thread giving it the constructor args and started it and joined the main thread to the back of it.

Race and Timing Conditions


No discussion of threading would be reasonable without discussing the idea of Race and Timing conditions. By race conditions I refer to the fact that variables live on the heap and that during the processing of a thread the cpu can context switch and other threads can write to the thread at anytime and you have no control when. Consider this trivial line of code



var = var +1


Well this line is three actions and the context switch could happen at any point in between them causing inconsistency
The three actions are
var is pulled from heap
var gets 1 added
var takes the value of var with 1 added

So it is important that we guard against such conditions, in C++ we would use critical sections and mutual exclusion locks in Python we have a handy lock class that does something in that vein

We would create a lock, acquire() the lock and release() the lock when done - this means that others must wait on our lock


#Locking semantics race and critical sections
# create a lock
lock = threading.Lock()
# acquire the lock
variable=
0
with lock:
# add to the variable
variable = variable +
10
# release the lock automatically




With regards to timing we need to consider how threads notify each other they are ready ie producer needs to tell a consumer it has produced something that needs consuming. A simple way to do this is via a condition that can be shared




#Timing issues and Conditionals
# wait for a state change
# create a new condition which is a Reentract mutex lock
condition = threading.Condition()

with condition:
condition.wait()
# alert the waiting thread

with condition:
condition.notify()

#This is not perfect as you should notify after the other thread is waiting so we need some signal So we can use this with event



In order to notify that the thread is ready we can use a shared event to provide this information like so




# create a shared event
event = threading.Event()
# wait to be notified
with condition:
# indicate we are ready to be notified
event.set()
# wait to be notified
condition.wait()







multiprocessing.pool


Now we have talked of threads and race we may want to run a lot of threads to do a specific task here is one way to accomplish this.

The simplest way is to do Pool processing is to import multiprocessing. This module gives you access to a simple api that will allow you to create a process either in Order ( one after the other ) or in an Async way.

One of the easier tools to use in this library is class Pool, this gives you a thread pool that can be used and setup based on the number of Cpus your machine has. To determine the number of Cpus we merely do


import multiprocessing as mp

n_cpu = mp.cpu_count()
print(f'We have {n_cpu} cpus on this machine',n_cpu)



This information is useful as we need to give that to the pool so it knows how to set itself up to be able to take tasks. We do that setup like this



from multiprocessing.pool import Pool

pool = mp.Pool(processes=n_cpu)




Now we have the pool configured we can give it the tasks that we want to run. The best way is either with the map or map_async method or the apply or apply_async methods .
The apply method blocks until the task has been completed and then it returns the map does a mapping like the normal map keyword to map a task to something.
The async versions return immediately and deliver results later so there is no wait.

There is a simple monte carlo multi process code here that uses the pool.map




import numpy as np
import math
import matplotlib.pyplot as plt
from scipy.stats import norm
import scipy
from multiprocessing.pool import Pool
import multiprocessing as mp
import time

#Define Variables
S = 100
T = 250 #Number of trading days
mu = .05 #Return or Compound Annual Growth rate
vol = .2 #Volatility or Sigma
result=[]

def do_walk(id):

#create list of daily returns using random normal distribution
daily_returns=np.random.normal((mu/T),vol/math.sqrt(T),T)+1
#Parameters: for np.random.normal
#Mean (“centre”) of the distribution.
#Standard deviation (spread or “width”) of the distribution.
#size : int or tuple of ints, optional

#set starting price and create price series generated by above random daily returns
price_list = [S]

for x in daily_returns:
price_list.append(price_list[-1]*x)


return price_list[-1]


if __name__ == '__main__':
t0 = time.time()
n_cpu = mp.cpu_count()
print(f'We have {n_cpu} cpus on this machine')

pool = mp.Pool(processes=n_cpu)
result=[pool.map(do_walk, range(1000000))]
t1=time.time()
tdiff=t1-t0
print(f'The process took {tdiff} sec to run')

#Distribution
plt.hist(result,bins=100)
plt.show()
#npRes=np.array(result)
print("Mean is ",np.mean(result))

print("5% quantile =",np.percentile(result,5))
print("95% quantile =",np.percentile(result,95))

plt.hist(result,bins=100)
plt.axvline(np.percentile(result,5), color='r', linestyle='dashed', linewidth=2)
plt.axvline(np.percentile(result,95), color='r', linestyle='dashed', linewidth=2)
plt.show()




We have 12 cpus on this machine
The process took 3.8599071502685547 sec to run
Mean is 105.13720131628156
5% quantile = 74.17963327697234
95% quantile = 143.26626397922047





People who enjoyed this article also enjoyed the following:


Python Decorator Semantics
Python Threading
Python Numpy
Python Pandas
Equity Derivatives tutorial
Fixed Income tutorial


And the following Trails:

C++
Java
python
Scala
Investment Banking tutorials


HOME
homeicon




By clicking Dismiss you accept that you may get a cookie that is used to improve your user experience and for analytics.
All data is anonymised. Our privacy page is here =>
Privacy Policy
This message is required under GDPR (General Data Protection Rules ) and the ICO (Information Commissioners Office).