Hi Guys! Welcome to part 5 of this series of building a deep learning library from scratch. This post will cover the code of the automatic differentiation part of the library. Automatic Differentiation was discussed in the previous post, so do check it out if you don't know what Autodiff is.
The github repo for this series is....
ashwins-code / Zen-Deep-Learning-Library
Deep Learning library written in Python. Contains code for my blog series on building a deep learning library.
Zen - Deep Learning Library
A deep learning library written in Python.
Contains the code for my blog series where we build this library from scratch
- mnist.py contains an example for a MNIST digit recogniser using the library
- rnn.py contains an example of a Recurrent Neural Network which learns to fit to the graph sin(x) * cos(x)
Approach
Automatic Differentiation relies on a computation graph to calculate derivatives.
Ultimately, it just boils down to nodes with some connections and we traverse these nodes in some way to calculate derivatives.
For our library, we will build our computation graph on the fly, meaning any calculations performed would be recorded onto the computation graph.
Once we have the graph, we need to find a way to use it to calculate the derivatives of all the variables in that graph.
For example, say we produced the following graph...
This represents...
Now, using the graph, our aim is to find the derivative of e with respect all the variables in that graph (a,b,c,d,e)
For my implementation, I found it easiest to traverse the graph in a depth-first manner to calculate derivatives.
So firstly, we start at e and find with respect to e (which is just 1).
Then, we look at node c, meaning we now need to calculate . We can see that is the result of a multiplication between and , meaning (since we treat everything apart from the variable we are on as a constant).
Remembering we are traversing depth-first, the next node we move onto is node a, meaning we calculate . This is a bit more tricky, since a does not have a direct connection to e. However, using the chain rule, we know that . We just calculated , so all we need to calculate now is . We can see that c is an addition of a and b, so
Hopefully, you can now see how we would use a graph to find the derivatives of all the variables in that graph.
Tensor Class
Firstly, we need to create our tensor class, which would act as the variable nodes on our graph.
import numpy as np
import string
import random
def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
np.seterr(invalid='ignore')
def is_matrix(o):
return type(o) == np.ndarray
def same_shape(s1, s2):
for a, b in zip(s1, s2):
if a != b:
return False
return True
class Tensor:
__array_priority__ = 1000
def __init__(self, value, trainable=True):
self.value = value
self.dependencies = []
self.grads = []
self.grad_value = None
self.shape = 0
self.matmul_product = False
self.gradient = 0
self.trainable = trainable
self.id = id_generator()
if is_matrix(value):
self.shape = value.shape
What's going on here?
def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
Function that generates a unique id using random characters
def is_matrix(o):
return type(o) == np.ndarray
Simple function which checks if a value is a numpy array or not.
self.value = value
This line should be self-explanatory, it just holds the value that is given to the tensor.
self.dependencies = []
If the tensor was the result of any operation e.g add or divide, this property would hold the list of tensors that were involved in the operation, producing this tensor (this is how the computation graph is built). If the tensor is not a result of any operation, then this is empty.
self.grads = []
This property would hold the list of derivative of each of the tensor's dependencies with respect to the tensor.
self.shape = 0
...
if is_matrix(value):
self.shape = value.shape
self.shape holds the shape of the tensor's value. Only numpy arrays can have a shape, which is why it is 0 by default.
self.matmul_product = False
Specifies whether the tensor was a result of a matrix multiplicaton or not (this will help later since the chain rule works differently for matrix multiplacation).
self.gradient = np.ones_like(self.value)
After we use the computation graph to calculate gradients, this property would hold the gradient calculated for the tensor. It is initially set to a matrix of 1s, the same shape as its value.
self.trainable = trainable
Some nodes on the graph do not need their derivatives to be calculated, so this property specifies whether this is the case or not for this tensor.
self.id = id_generator()
Tensors will need to have something to uniquely identify themselves. We will see this coming in use when we reimplement our optimisers to use this automatic differentiation module in a later post.
Operations with Tensors
class Tensor:
__array_priority__ = 1000
def __init__(self, value, trainable=True):
self.value = value
self.dependencies = []
self.grads = []
self.grad_value = None
self.shape = 0
self.matmul_product = False
self.gradient = 0
self.trainable = trainable
self.id = id_generator()
if is_matrix(value):
self.shape = value.shape
def depends_on(self, target):
if self == target:
return True
dependencies = self.dependencies
for dependency in dependencies:
if dependency == target:
return True
elif dependency.depends_on(target):
return True
return False
def __mul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
def __rmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
def __add__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value + other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(np.ones_like(other.value))
return var
def __radd__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value + other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(np.ones_like(other.value))
return var
def __sub__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other)
var = Tensor(self.value - other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(-np.ones_like(other.value))
return var
def __rsub__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value - self.value)
var.dependencies.append(other)
var.dependencies.append(self)
var.grads.append(np.ones_like(other.value))
var.grads.append(-np.one_like(self.value))
return var
def __pow__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value ** other.value)
var.dependencies.append(self)
var.dependencies.append(other)
grad_wrt_self = other.value * self.value ** (other.value - 1)
var.grads.append(grad_wrt_self)
grad_wrt_other = (self.value ** other.value) * np.log(self.value)
var.grads.append(grad_wrt_other)
return var
def __rpow__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value ** self.value)
var.dependencies.append(other)
var.dependencies.append(self)
grad_wrt_other = self.value * other.value ** (self.value - 1)
var.grads.append(grad_wrt_other)
grad_wrt_self = (other.value ** self.value) * np.log(other.value)
var.grads.append(grad_wrt_self)
return var
def __truediv__(self, other):
return self * (other ** -1)
def __rtruediv__(self, other):
return other * (self ** -1)
def __matmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value @ other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value.T)
var.grads.append(self.value.T)
var.matmul_product = True
return var
def __rmatmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value @ self.value)
var.dependencies.append(other)
var.dependencies.append(self)
var.grads.append(self.value.T)
var.grads.append(other.value.T)
var.matmul_product = True
return var
To understand what is happening here, let's look at one of the methods
def __mul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
Firstly, __mul__ is an operator overloader. This just means that whenever we want to multiply a tensor with something else, this method would be called. You also see __rmul__, which is the same thing, but is called when the Tensor object is on the right hand side of the operation.
For example...
t = Tensor(10)
t * 5 #__mul__ is called
5 * t #__rmul__ is called
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
other represents the thing that this tensor is being multiplied with. If other is not a tensor, we want to convert it to a tensor, holding the value of other. Since other was not already a tensor, it means that it is a constant, so we do not need to calculate its derivative, since we only want to calculate the derivative of something if we need to change its value, usually when training a model. This is why trainable=False.
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var holds the resulting tensor of this operation. The second and third lines add the two tensors used in this operator to var's dependencies (look back up if you forgot what the dependencies property is used for)
var.grads.append(other.value) # dvar/dother
var.grads.append(self.value) # dvar/dself
return var
This now adds the derivates of the two operands with respect to var to var's grads. These lines would obviously be different in the other class methods, since the derivatives depend on the operation being applied (in this case it's multiplication). Note how the order of how they're added corresponds with the order of the tensors in the dependencies property. We don't store them as tensors, since it will be much quicker to calculate derivatives using raw values, instead of our tensor class.
Calculating gradients using the graph!
def get_gradients(self, grad = None):
grad = np.ones_like(self.value) if grad is None else grad
grad = np.float32(grad)
for dependency, _grad in zip(self.dependencies, self.grads):
if dependency.trainable:
local_grad = np.float32(_grad)
if self.matmul_product:
if dependency == self.dependencies[0]:
local_grad = grad @ local_grad
else:
local_grad = local_grad @ grad
else:
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
ndims_added = grad.ndim - local_grad.ndim
for _ in range(ndims_added):
grad = grad.sum(axis=0)
for i, dim in enumerate(dependency.shape):
if dim == 1:
grad = grad.sum(axis=i, keepdims=True)
local_grad = local_grad * np.nan_to_num(grad)
dependency.gradient += local_grad
dependency.get_gradients(local_grad)
This method recursively implements the depth-first derivative calculating that was outlined earlier. This can be called for any tensor in the graph, not just the top one. It would result in all the derivatives of all the tensors, that were used to produce this tensor, being calculated.
grad holds the incoming gradients/the previously calculated gradient in the previous "level" of the graph.
for dependency, _grad in zip(self.dependencies, self.grads):
if dependency.trainable:
local_grad = np.float32(_grad)
if self.matmul_product:
if dependency == self.dependencies[0]:
local_grad = grad @ local_grad
else:
local_grad = local_grad @ grad
else:
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
ndims_added = grad.ndim - local_grad.ndim
for _ in range(ndims_added):
grad = grad.sum(axis=0)
for i, dim in enumerate(dependency.shape):
if dim == 1:
grad = grad.sum(axis=i, keepdims=True)
local_grad = local_grad * np.nan_to_num(grad)
This part of the code goes through each of the tensor's dependencies along with the derivative the tensor with respect to that dependency (held in _grad). If the dependency is trainable, it then checks if it was a result of a matrix multiplication or not.
The chain rule is then accordingly applied, using the previously calculated gradient grad and the gradient of the current dependency _grad. local_grad stores the result after the chain rule is applied.
def same_shape(s1, s2):
for a, b in zip(s1, s2):
if a != b:
return False
return True
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
ndims_added = grad.ndim - local_grad.ndim
for _ in range(ndims_added):
grad = grad.sum(axis=0)
for i, dim in enumerate(dependency.shape):
if dim == 1:
grad = grad.sum(axis=i, keepdims=True)
If we focus on this part of the code, this handles any case where local_grad and grad aren't the same shape (they need to be in order for chain rule to be applied). This shape mismatch arises if there was any broadcasting performed in any of the calculations. Broadcasting is term used to describe how numpy would perform operations involving arrays of different shapes. You can read more about it on the numpy docs. All this part of the code does is sum across the broadcasted axis of grad, in order to reduce its shape to match the shape of local_grad.
dependency.gradient += local_grad
dependency.get_gradients(local_grad)
The gradient is then recorded to the gradient property of the dependency. It then continues the depth-first traversal by calling the get_gradients method on the dependency, passing through the gradient that was just calculated.
Overall, our code for autodiff should look like this...
import numpy as np
import string
import random
def id_generator(size=10, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
np.seterr(invalid='ignore')
def is_matrix(o):
return type(o) == np.ndarray
def same_shape(s1, s2):
for a, b in zip(s1, s2):
if a != b:
return False
return True
class Tensor:
__array_priority__ = 1000
def __init__(self, value, trainable=True):
self.value = value
self.dependencies = []
self.grads = []
self.grad_value = None
self.shape = 0
self.matmul_product = False
self.gradient = 0
self.trainable = trainable
self.id = id_generator()
if is_matrix(value):
self.shape = value.shape
def depends_on(self, target):
if self == target:
return True
dependencies = self.dependencies
for dependency in dependencies:
if dependency == target:
return True
elif dependency.depends_on(target):
return True
return False
def __mul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
def __rmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value * other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value)
var.grads.append(self.value)
return var
def __add__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value + other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(np.ones_like(other.value))
return var
def __radd__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value + other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(np.ones_like(other.value))
return var
def __sub__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other)
var = Tensor(self.value - other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(np.ones_like(self.value))
var.grads.append(-np.ones_like(other.value))
return var
def __rsub__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value - self.value)
var.dependencies.append(other)
var.dependencies.append(self)
var.grads.append(np.ones_like(other.value))
var.grads.append(-np.one_like(self.value))
return var
def __pow__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value ** other.value)
var.dependencies.append(self)
var.dependencies.append(other)
grad_wrt_self = other.value * self.value ** (other.value - 1)
var.grads.append(grad_wrt_self)
grad_wrt_other = (self.value ** other.value) * np.log(self.value)
var.grads.append(grad_wrt_other)
return var
def __rpow__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value ** self.value)
var.dependencies.append(other)
var.dependencies.append(self)
grad_wrt_other = self.value * other.value ** (self.value - 1)
var.grads.append(grad_wrt_other)
grad_wrt_self = (other.value ** self.value) * np.log(other.value)
var.grads.append(grad_wrt_self)
return var
def __truediv__(self, other):
return self * (other ** -1)
def __rtruediv__(self, other):
return other * (self ** -1)
def __matmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(self.value @ other.value)
var.dependencies.append(self)
var.dependencies.append(other)
var.grads.append(other.value.T)
var.grads.append(self.value.T)
var.matmul_product = True
return var
def __rmatmul__(self, other):
if not (isinstance(other, Tensor)):
other = Tensor(other, trainable=False)
var = Tensor(other.value @ self.value)
var.dependencies.append(other)
var.dependencies.append(self)
var.grads.append(self.value.T)
var.grads.append(other.value.T)
var.matmul_product = True
return var
def grad(self, target, grad = None):
grad = self.value / self.value if grad is None else grad
grad = np.float32(grad)
if not self.depends_on(target):
return 0
if self == target:
return grad
final_grad = 0
for dependency, _grad in zip(self.dependencies, self.grads):
local_grad = np.float32(_grad) if dependency.depends_on(target) else 0
if local_grad is not 0:
if self.matmul_product:
if dependency == self.dependencies[0]:
local_grad = grad @ local_grad
else:
local_grad = local_grad @ grad
else:
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
ndims_added = grad.ndim - local_grad.ndim
for _ in range(ndims_added):
grad = grad.sum(axis=0)
for i, dim in enumerate(local_grad.shape):
if dim == 1:
grad = grad.sum(axis=i, keepdims=True)
local_grad *= grad
final_grad += dependency.grad(target, local_grad)
return final_grad
def get_gradients(self, grad = None):
grad = np.ones_like(self.value) if grad is None else grad
grad = np.float32(grad)
for dependency, _grad in zip(self.dependencies, self.grads):
if dependency.trainable:
local_grad = np.float32(_grad)
if self.matmul_product:
if dependency == self.dependencies[0]:
local_grad = grad @ local_grad
else:
local_grad = local_grad @ grad
else:
if dependency.shape != 0 and not same_shape(grad.shape, local_grad.shape):
ndims_added = grad.ndim - local_grad.ndim
for _ in range(ndims_added):
grad = grad.sum(axis=0)
for i, dim in enumerate(dependency.shape):
if dim == 1:
grad = grad.sum(axis=i, keepdims=True)
local_grad = local_grad * np.nan_to_num(grad)
dependency.gradient += local_grad
dependency.get_gradients(local_grad)
def __repr__(self):
return f"Tensor ({self.value})"
and it can be used like so...
a = Tensor(10)
b = Tensor(5)
c = 2
d = (a*b)**c
d.get_gradients()
print (a.gradient, b.gradient)
OUTPUT:
500.0 1000.0
Thank you
Thank you for reading through all of this post! The code of this post can be seen in the Github repo linked at the start in autodiff.py.
Top comments (0)