Discriminative learning for discrete MNIST data using randomly structured SPNs

This notebook shows how to build a randomly structured SPN and train it to classify digits using a TensorFlow optimizer on binarized MNIST data.

Setting up the imports and preparing the data

We load the data from tf.keras.datasets. Preprocessing consists of flattening and binarization of the data.

In [ ]:
%matplotlib inline
import libspn as spn
import tensorflow as tf
import numpy as np
from libspn.examples.utils.dataiterator import DataIterator

# Load
(train_x, train_y), (test_x, test_y) = tf.keras.datasets.mnist.load_data()

def binarize(x):
    return np.where(np.greater(x / 255., 0.25), 1.0, 0.0)

def flatten(x):
    return x.reshape(-1, np.prod(x.shape[1:]))

def preprocess(x, y):
    return binarize(flatten(x)).astype(int), np.expand_dims(y, axis=1)

# Preprocess
train_x, train_y = preprocess(train_x, train_y)
test_x, test_y = preprocess(test_x, test_y)

Defining the hyperparameters

Some hyperparameters for the SPN.

  • num_subsets is used for the DenseSPNGenerator. This corresponds to the number of variable subsets joined by product nodes in the SPN.
  • num_mixtures is used for the DenseSPNGenerator. This corresponds to the number of sum nodes per scope.
  • num_decomps is used for the DenseSPNGenerator. This corresponds to the number of decompositions generated at each level of products from top-down.
  • num_vars corresponds to the number of input variables (the number of pixels in the case of MNIST).
  • balanced is used for the DenseSPNGenerator. If true, then the generated SPN will have balanced subsets and will consequently be a balanced tree.
  • input_dist is the input distribution (the first product/sum layer in the SPN). spn.DenseSPNGenerator.InputDist.RAW corresponds to raw indicators being joined (so first layer is a product layer). spn.DenseSPNGenerator.InputDist.MIXTURE would correspond to a sums on top of each indicator.
  • num_leaf_values is the number of unique discrete values in the leaf distribution (2 since data is binary).
  • inference_type determines the kind of forward inference where spn.InferenceType.MARGINAL corresponds to sum nodes marginalizing their inputs. spn.InferenceType.MPE would correspond to having max nodes instead.
  • beta1 corresponds to the \beta_1 parameter of the Adam optimizer
  • beta2 corresponds to the \beta_2 parameter of the Adam optimizer
  • learning_rate is the learning rate for the Adam optimizer
  • num_classes, batch_size and num_epochs should be obvious:)
In [ ]:
# Number of variable subsets that a product joins
num_subsets = 2
# Number of sums per scope
num_mixtures = 4
# Number of decompositions per product layer
num_decomps = 1
# Generate balanced subsets -> balanced tree
balanced = True
# Number of variables
num_vars = train_x.shape[1]
# Input distribution. Raw corresponds to first layer being product that 
# takes raw indicators
input_dist = spn.DenseSPNGenerator.InputDist.RAW
# Number of different values at leaf (binary here, so 2)
num_leaf_values = 2
# Initial value for path count accumulators
initial_accum_value = 0.1
# Inference type (can also be spn.InferenceType.MPE) where 
# sum nodes are turned into max nodes
inference_type = spn.InferenceType.MARGINAL
# Adam optimizer parameters
beta1 = 0.9
beta2 = 0.9
learning_rate = 5e-4
# Other params
num_classes = 10
batch_size = 32
num_epochs = 50

Building the SPN

Our SPN consists of binary leaf indicators, a dense SPN per class and a root node connecting the 10 class-wise sub-SPNs. We also add an indicator node to the root node to model the latent class variable. Finally, we generate Weight nodes for the full SPN by using spn.generate_weights.

In [ ]:
# Leaf nodes
leaf_indicators = spn.IndicatorLeaf(num_vals=num_leaf_values, num_vars=num_vars)

# Generates densely connected random SPNs
dense_generator = spn.DenseSPNGenerator(
    num_subsets=num_subsets, num_mixtures=num_mixtures, num_decomps=num_decomps, 
    balanced=balanced, input_dist=input_dist, 
    node_type=spn.DenseSPNGenerator.NodeType.BLOCK)

# Generate a dense SPN for each class
class_roots = [dense_generator.generate(leaf_indicators) for _ in range(num_classes)]

# Connect sub-SPNs to a root
root = spn.convert_to_layer_nodes(spn.Sum(*class_roots, name="RootSum"))

# Add an IVs node to the root as a latent class variable
class_indicators = root.generate_latent_indicators()

# Generate the weights for the SPN rooted at `root`
spn.generate_weights(root)

print("SPN depth: {}".format(root.get_depth()))
print("Number of products layers: {}".format(root.get_num_nodes(node_type=spn.ProductsLayer)))
print("Number of sums layers: {}".format(root.get_num_nodes(node_type=spn.SumsLayer)))

Defining the TensorFlow graph

Now that we have defined the SPN graph we can declare the TensorFlow operations needed for training and evaluation. The MPEState class can be used to find the MPE state of any node in the graph. In this case we might be interested in finding the most likely class based on the evidence elsewhere. This corresponds to the MPE state of class_indicators.

In [ ]:
# Op for initializing all weights
weight_init_op = spn.initialize_weights(root)
# Op for getting the log probability of the root
root_log_prob = root.get_log_value(inference_type=inference_type)

# Set up ops for discriminative GD learning
gd_learning = spn.GDLearning(
    root=root, learning_task_type=spn.LearningTaskType.SUPERVISED,
    learning_method=spn.LearningMethodType.DISCRIMINATIVE)
optimizer = tf.train.AdamOptimizer(beta1=0.95, beta2=0.95)

# Use post_gradients_ops = True to also normalize weights (and clip Gaussian variance)
gd_update_op = gd_learning.learn(optimizer=optimizer, post_gradient_ops=True)

# Compute predictions and matches
mpe_state = spn.MPEState()
root_marginalized = spn.Sum(*root.values, weights=root.weights)
marginalized_ivs = root_marginalized.generate_latent_indicators(
    feed=-tf.ones_like(class_indicators.feed)) 
predictions, = mpe_state.get_state(root_marginalized, marginalized_ivs)
with tf.name_scope("MatchPredictionsAndTarget"):
    match_op = tf.equal(tf.to_int64(predictions), tf.to_int64(class_indicators.feed))

Training the SPN

In [ ]:
# Set up some convenient iterators
train_iterator = DataIterator([train_x, train_y], batch_size=batch_size)
test_iterator = DataIterator([test_x, test_y], batch_size=batch_size)

def fd(x, y):
    return {leaf_indicators: x, class_indicators: y}

with tf.Session() as sess:
    # Initialize things
    sess.run([tf.global_variables_initializer(), weight_init_op])
    
    # Do one run for test likelihoods
    matches = []
    for batch_x, batch_y in test_iterator.iter_epoch("Testing"):
        batch_matches = sess.run(match_op, fd(batch_x, batch_y))
        matches.extend(batch_matches.ravel())
        test_iterator.display_progress(Accuracy="{:.2f}".format(np.mean(batch_matches)))
    mean_test_accuracy = np.mean(matches)
    
    print("Before training test accuracy = {:.2f}".format(mean_test_accuracy))                              
    for epoch in range(num_epochs):
        
        # Train
        matches = []
        for batch_x, batch_y in train_iterator.iter_epoch("Training"):
            batch_matches, _ = sess.run(
                [match_op, gd_update_op], fd(batch_x, batch_y))
            matches.extend(batch_matches.ravel())
            train_iterator.display_progress(Accuracy="{:.2f}".format(np.mean(batch_matches)))
        mean_train_accuracy = np.mean(matches)
        
        # Test
        matches = []
        for batch_x, batch_y in test_iterator.iter_epoch("Testing"):
            batch_matches = sess.run(match_op, fd(batch_x, batch_y))
            matches.extend(batch_matches.ravel())
            test_iterator.display_progress(Accuracy="{:.2f}".format(np.mean(batch_matches)))
        mean_test_accuracy = np.mean(matches)
        
        # Report
        print("Epoch {}, train accuracy = {:.2f}, test accuracy = {:.2f}".format(
            epoch, mean_train_accuracy, mean_test_accuracy))