Implementing a simple RNN

In this post we will be implementing two simple Recurrent Neural Networks (RNN) one for classification and the other for regression tasks.

Classification using RNN

It takes in a binary number and returns the XOR of the number. For example if the input is 10110 then it should output 11011, as

0 XOR 1 is 1
1 XOR 0 is 1
1 XOR 1 is 0
0 XOR 1 is 1
1 XOR 0 is 1

If you are not familiar with RNNs then you should definitely look into this post. I will be using tensorflow==1.0.0 and suggest you to use the same to reproduce the results and to avoid Import Error as some of the components of RNNs have been moved to contrib. So let’s dive into the code.

from __future__ import print_function
import numpy as np
import tensorflow as tf
from numpy.random import shuffle
from tensorflow.contrib.rnn import LSTMCell
import matplotlib.pyplot as plt

We start of by importing necessary packages and remember that we have imported LSTMCell from tensorflow.contrib.rnn where as if your are using previous versions like 0.11 then you should import it from tensorflow.nn.rnn_cell. I would suggest you to have a look at this excellent post on LSTM.

#create dataset
def create_dataset(num_samples):
    data = ["{0:012b}".format(i) for i in xrange(num_samples)]
    shuffle(data)
    data = [list(map(int,i)) for i in data]
    data = np.array(data)
    data = data.reshape(num_samples,12,1)

    output = np.zeros([num_samples,12],dtype=np.int)
    for sample,out in zip(data,output):
        count = 0
        for c,bit in enumerate(sample):
            if bit[0]==1:
                count += 1
            out = 1 - int(count%2==0)
    return data,output

The create_dataset function is used to create our desired dataset. Here we are creating the dataset having sequences of length 12, therefore the input will be of shape num.of.samples x 12 x 1 and the output will be of shape num.of.samples x 12.

lstm_units = 64

#Input shape: (num_samples,seq_length,input_dimension)
#Output shape: (num_samples, target)
input_data = tf.placeholder(tf.float32,shape=[None,None,1])
output_data = tf.placeholder(tf.int64,shape=[None,None])
cell = LSTMCell(lstm_units,num_proj=2,state_is_tuple=True)
out,_ = tf.nn.dynamic_rnn(cell,input_data,dtype=tf.float32)    #shape: (None, 12, 2)

pred = tf.argmax(out,axis=2)    #shape: (None, 12)

We create placeholders for the inputs and targets and an LSTM cell with 64 units. we provide num_proj=2 so that the output of our LSTM cell will of 2-dimensions. We then implement the tf.nn.dynamic_rnn which creates a dynamic RNN allowing us to feed batches of variable length. We collect the outputs and leave the states. The outputs will be of shape no_of_samples x 12 x 2 since we are feeding in the sequences of length 12 . And finally we compute tf.argmax over the output to get the predicted labels for each timestep for a sample in the batch.

cost = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=output_data,logits=out))
optimizer = tf.train.AdamOptimizer(learning_rate=0.1).minimize(cost)

correct = tf.equal(output_data,pred)
accuracy = tf.reduce_mean(tf.cast(correct,tf.float32))

init_op = tf.initialize_all_variables()
sess = tf.Session()
sess.run(init_op)
costs = []

We define our cost function using tf.nn.sparse_softmax_cross_entropy_with_logits and remember we provided logits=out and not pred since it computes the softmax over the out and it expects the logits to be in shape no_of_samples x num_time_steps x n_classes. And we define AdamOptimizer as our optimizer. We then find out number of correctly predicted samples and the accuracy.

for epoch in xrange(100):
    inp_data,out_data = create_dataset(4096)
    _,c,acc = sess.run([optimizer,cost,accuracy],feed_dict={input_data: inp_data, output_data: out_data})
    print("Epoch: {}, Cost: {}, Accuracy: {}%".format(epoch,c,acc*100))
    costs.append(c)

We loop over 100 epochs which is fine, and we train our model and print the Cost and Accuracy at each training step. Now that we have our model trained and it’s time to test the model.

inp_data = [[[1],[1],[0],[0],[1],[1],[1],[1],[1],[0],[0],[1],[1],[1],[0],[1]]]

print("Input data:", inp_data)
print("Predicted: ", sess.run(pred,feed_dict={input_data: inp_data}))

plt.grid("off")
plt.plot(costs,label="Cost Function")
plt.xlabel("Epoch")
plt.ylabel("Cost")
plt.legend()
plt.show()

sess.close()

We create an input with num_time_steps>12 in order to test our model as it have never seen this input and also it works with sequences of length greater/lesser than trained sequences as we have used Dynamic RNN for our model. We predict the output and print them to console and also plot the cost function.

The output looks like this and after observing the output we can say that our model performed well and it is able to learn the sequences from past.

And the cost function looks like this.

Regression using RNN

Let’s try to implement the same for regression tasks for predicting a curve over time. The actual implementation is pretty much the same as we have done before and the only change is the dataset we use and the cost function we minimize makes it a regression task. So let’s first rewrite our create_dataset function as follows.

#create dataset
def create_dataset(num_samples, seq_length=10):
    x = np.linspace(0,20,num_samples)
    X = x*np.sin(x) + x*np.cos(2*x)
    data = np.split(X,int(num_samples/seq_length))
    output = []
    input_data = []
    for i, chunk in enumerate(data):
        o = np.roll(chunk, -1)
        try:
            o[-1] = data[i+1][0]
        except IndexError:
            o[-1] = o[-2]
        output.append(o)
    return np.array(data).reshape(-1,10,1), np.array(output).reshape(-1,10)

We simply create a dataset with num_time_steps=10. The actual dataset can be visualized as below.

And then we need to change the num_proj=1 and our cost function as follows.

#Input shape: (num_samples,seq_length,input_dimension)
#Output shape: (num_samples, target)

input_data = tf.placeholder(tf.float32,shape=[None,None,1])
output_data = tf.placeholder(tf.float32,shape=[None,None])
cell = LSTMCell(lstm_units,num_proj=1,state_is_tuple=True)
out,_ = tf.nn.dynamic_rnn(cell,input_data,dtype=tf.float32)   #shape: (None, 10, 1)

pred = tf.squeeze(out)    #shape: (None, 10)

cost = tf.reduce_mean(tf.square(pred - output_data))
optimizer = tf.train.AdamOptimizer(learning_rate=0.1).minimize(cost)

And everything works as usual with this minor change. Now the actual cost function can be visualized as below, shows that we have minimized the cost function as required.

And plotting the actual and predicted outputs on single plot shows that our model have learned well and with great accuracy.

You can download the code used for this project at my github profile.

Thank you, Have a nice day.