Source code for qiskit.aqua.components.neural_networks.numpy_discriminator

# -*- coding: utf-8 -*-

# This code is part of Qiskit.
#
# (C) Copyright IBM 2019, 2020.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Discriminator

The neural network is based on a neural network introduced in:
https://towardsdatascience.com/lets-code-a-neural-network-in-plain-numpy-ae7e74410795
"""

import os
import logging
import numpy as np
from qiskit.aqua.components.optimizers import ADAM
from .discriminative_network import DiscriminativeNetwork

logger = logging.getLogger(__name__)


# pylint: disable=invalid-name


class DiscriminatorNet():
    """
    Discriminator

    The neural network is based on a neural network introduced in:
    https://towardsdatascience.com/lets-code-a-neural-network-in-plain-numpy-ae7e74410795
    """

    def __init__(self, n_features=1, n_out=1):
        """
        Initialize the discriminator network.

        Args:
            n_features (int): Dimension of input data samples.
            n_out (int): output dimension
        """
        self.architecture = [
            {"input_dim": n_features, "output_dim": 50, "activation": "leaky_relu"},
            {"input_dim": 50, "output_dim": 20, "activation": "leaky_relu"},
            {"input_dim": 20, "output_dim": n_out, "activation": "sigmoid"},
        ]

        self.parameters = []
        self.memory = {}

        for _, layer in enumerate(self.architecture):
            activ_function_curr = layer["activation"]
            layer_input_size = layer["input_dim"]
            layer_output_size = layer["output_dim"]
            params_layer = np.random.rand(layer_output_size * layer_input_size)
            if activ_function_curr == "leaky_relu":
                params_layer = (params_layer * 2 - np.ones(np.shape(params_layer))) * 0.7
            elif activ_function_curr == "sigmoid":
                params_layer = (params_layer * 2 - np.ones(np.shape(params_layer))) * 0.2
            else:
                params_layer = params_layer * 2 - np.ones(np.shape(params_layer))
            self.parameters = np.append(self.parameters, params_layer)
            self.parameters.flatten()

    def forward(self, x):
        """
        Forward propagation.

        Args:
            x (numpy.ndarray): , Discriminator input, i.e. data sample.

        Returns:
            list: Discriminator output, i.e. data label.
        """

        def sigmoid(z):
            sig = 1 / (1 + np.exp(-z))
            return sig

        def leaky_relu(z, slope=0.2):
            return np.maximum(
                np.zeros(np.shape(z)), z) + slope * np.minimum(np.zeros(np.shape(z)), z)

        def single_layer_forward_propagation(x_old, w_new, activation="leaky_relu"):
            z_curr = np.dot(w_new, x_old)

            if activation == "leaky_relu":
                activation_func = leaky_relu
            elif activation == "sigmoid":
                activation_func = sigmoid
            else:
                raise Exception('Non-supported activation function')

            return activation_func(z_curr), z_curr

        x_new = x
        pointer = 0
        for idx, layer in enumerate(self.architecture):
            layer_idx = idx + 1
            activ_function_curr = layer["activation"]
            layer_input_size = layer["input_dim"]
            layer_output_size = layer["output_dim"]
            if idx == 0:
                x_old = np.reshape(x_new, (layer_input_size, len(x_new)))
            else:
                x_old = x_new
            pointer_next = pointer + (layer_output_size * layer_input_size)
            w_curr = self.parameters[pointer:pointer_next]
            w_curr = np.reshape(w_curr, (layer_output_size, layer_input_size))
            pointer = pointer_next
            x_new, z_curr = single_layer_forward_propagation(x_old, w_curr, activ_function_curr)

            self.memory["a" + str(idx)] = x_old
            self.memory["z" + str(layer_idx)] = z_curr

        return x_new

    def backward(self, x, y, weights=None):
        """
        Backward propagation.

        Args:
           x (numpy.ndarray): sample label (equivalent to discriminator output)
           y (numpy.ndarray): array, target label
           weights (numpy.ndarray): customized scaling for each sample (optional)

        Returns:
            tuple(numpy.ndarray, numpy.ndarray): parameter gradients
        """

        def sigmoid_backward(da, z):
            sig = 1 / (1 + np.exp(-z))
            return da * sig * (1 - sig)

        def leaky_relu_backward(da, z, slope=0.2):
            dz = np.array(da, copy=True)
            for i, line in enumerate(z):
                for j, element in enumerate(line):
                    if element < 0:
                        dz[i, j] = dz[i, j] * slope
            return dz

        def single_layer_backward_propagation(da_curr,
                                              w_curr, z_curr, a_prev, activation="leaky_relu"):
            # m = a_prev.shape[1]
            if activation == "leaky_relu":
                backward_activation_func = leaky_relu_backward
            elif activation == "sigmoid":
                backward_activation_func = sigmoid_backward
            else:
                raise Exception('Non-supported activation function')

            dz_curr = backward_activation_func(da_curr, z_curr)
            dw_curr = np.dot(dz_curr, a_prev.T)
            da_prev = np.dot(w_curr.T, dz_curr)

            return da_prev, dw_curr

        grads_values = np.array([])
        m = y.shape[1]
        y = y.reshape(np.shape(x))
        if weights is not None:
            da_prev = - np.multiply(
                weights,
                np.divide(y, np.maximum(np.ones(np.shape(x)) * 1e-4, x))
                - np.divide(1 - y, np.maximum(np.ones(np.shape(x)) * 1e-4, 1 - x)))
        else:
            da_prev = - (np.divide(y, np.maximum(np.ones(np.shape(x)) * 1e-4, x))
                         - np.divide(1 - y, np.maximum(np.ones(np.shape(x)) * 1e-4, 1 - x))) / m

        pointer = 0

        for layer_idx_prev, layer in reversed(list(enumerate(self.architecture))):
            layer_idx_curr = layer_idx_prev + 1
            activ_function_curr = layer["activation"]

            da_curr = da_prev

            a_prev = self.memory["a" + str(layer_idx_prev)]
            z_curr = self.memory["z" + str(layer_idx_curr)]

            layer_input_size = layer["input_dim"]
            layer_output_size = layer["output_dim"]
            pointer_prev = pointer - (layer_output_size * layer_input_size)
            if pointer == 0:
                w_curr = self.parameters[pointer_prev:]
            else:
                w_curr = self.parameters[pointer_prev:pointer]
            w_curr = np.reshape(w_curr, (layer_output_size, layer_input_size))
            pointer = pointer_prev

            da_prev, dw_curr = single_layer_backward_propagation(da_curr,
                                                                 np.array(w_curr), z_curr, a_prev,
                                                                 activ_function_curr)

            grads_values = np.append([dw_curr], grads_values)

        return grads_values


[docs]class NumPyDiscriminator(DiscriminativeNetwork): """ Discriminator based on NumPy """ def __init__(self, n_features: int = 1, n_out: int = 1) -> None: """ Args: n_features: Dimension of input data vector. n_out: Dimension of the discriminator's output vector. """ super().__init__() self._n_features = n_features self._n_out = n_out self._discriminator = DiscriminatorNet(self._n_features, self._n_out) self._optimizer = ADAM(maxiter=1, tol=1e-6, lr=1e-3, beta_1=0.7, beta_2=0.99, noise_factor=1e-4, eps=1e-6, amsgrad=True) self._ret = {}
[docs] def set_seed(self, seed): """ Set seed. Args: seed (int): seed """ np.random.RandomState(seed)
[docs] def save_model(self, snapshot_dir): """ Save discriminator model Args: snapshot_dir (str): directory path for saving the model """ # save self._discriminator.params_values np.save(os.path.join(snapshot_dir, 'np_discriminator_architecture.csv'), self._discriminator.architecture) np.save(os.path.join(snapshot_dir, 'np_discriminator_memory.csv'), self._discriminator.memory) np.save(os.path.join(snapshot_dir, 'np_discriminator_params.csv'), self._discriminator.parameters) self._optimizer.save_params(snapshot_dir)
[docs] def load_model(self, load_dir): """ Load discriminator model Args: load_dir (str): file with stored pytorch discriminator model to be loaded """ self._discriminator.architecture = \ np.load(os.path.join(load_dir, 'np_discriminator_architecture.csv')) self._discriminator.memory = np.load(os.path.join(load_dir, 'np_discriminator_memory.csv')) self._discriminator.parameters = np.load(os.path.join(load_dir, 'np_discriminator_params.csv')) self._optimizer.load_params(load_dir)
@property def discriminator_net(self): """ Get discriminator Returns: DiscriminatorNet: discriminator object """ return self._discriminator @discriminator_net.setter def discriminator_net(self, net): self._discriminator = net
[docs] def get_label(self, x, detach=False): # pylint: disable=arguments-differ,unused-argument """ Get data sample labels, i.e. true or fake. Args: x (numpy.ndarray): Discriminator input, i.e. data sample. detach (bool): depreciated for numpy network Returns: numpy.ndarray: Discriminator output, i.e. data label """ return self._discriminator.forward(x)
[docs] def loss(self, x, y, weights=None): """ Loss function Args: x (numpy.ndarray): sample label (equivalent to discriminator output) y (numpy.ndarray): target label weights(numpy.ndarray): customized scaling for each sample (optional) Returns: float: loss function """ if weights is not None: # Use weights as scaling factors for the samples and compute the sum return (-1) * np.dot(np.multiply(y, np.log(np.maximum(np.ones(np.shape(x)) * 1e-4, x))) + np.multiply(np.ones(np.shape(y)) - y, np.log(np.maximum(np.ones(np.shape(x)) * 1e-4, np.ones(np.shape(x)) - x))), weights) else: # Compute the mean return (-1) * np.mean(np.multiply(y, np.log(np.maximum(np.ones(np.shape(x)) * 1e-4, x))) + np.multiply(np.ones(np.shape(y)) - y, np.log(np.maximum(np.ones(np.shape(x)) * 1e-4, np.ones(np.shape(x)) - x))))
def _get_objective_function(self, data, weights): """ Get the objective function Args: data (tuple): training and generated data weights (numpy.ndarray): weights corresponding to training resp. generated data Returns: objective_function: objective function for the optimization """ real_batch = data[0] real_prob = weights[0] generated_batch = data[1] generated_prob = weights[1] def objective_function(params): self._discriminator.parameters = params # Train on Real Data prediction_real = self.get_label(real_batch) loss_real = self.loss(prediction_real, np.ones(np.shape(prediction_real)), real_prob) prediction_fake = self.get_label(generated_batch) loss_fake = self.loss(prediction_fake, np.zeros(np.shape(prediction_fake)), generated_prob) return 0.5 * (loss_real[0] + loss_fake[0]) return objective_function def _get_gradient_function(self, data, weights): """ Get the gradient function Args: data (tuple): training and generated data weights (numpy.ndarray): weights corresponding to training resp. generated data Returns: gradient_function: Gradient function for the optimization """ real_batch = data[0] real_prob = weights[0] generated_batch = data[1] generated_prob = weights[1] def gradient_function(params): self._discriminator.parameters = params prediction_real = self.get_label(real_batch) grad_real = self._discriminator.backward(prediction_real, np.ones(np.shape(prediction_real)), real_prob) prediction_generated = self.get_label(generated_batch) grad_generated = self._discriminator.backward(prediction_generated, np.zeros( np.shape(prediction_generated)), generated_prob) return np.add(grad_real, grad_generated) return gradient_function
[docs] def train(self, data, weights, penalty=False, quantum_instance=None, shots=None): """ Perform one training step w.r.t to the discriminator's parameters Args: data (tuple(numpy.ndarray, numpy.ndarray)): real_batch: array, Training data batch. generated_batch: array, Generated data batch. weights (tuple):real problem, generated problem penalty (bool): Depreciated for classical networks. quantum_instance (QuantumInstance): Depreciated for classical networks. shots (int): Number of shots for hardware or qasm execution. Ignored for classical networks. Returns: dict: with Discriminator loss and updated parameters. """ # Train on Generated Data # Force single optimization iteration self._optimizer._maxiter = 1 self._optimizer._t = 0 objective = self._get_objective_function(data, weights) gradient = self._get_gradient_function(data, weights) self._discriminator.parameters, loss, _ = \ self._optimizer.optimize(num_vars=len(self._discriminator.parameters), objective_function=objective, initial_point=np.array(self._discriminator.parameters), gradient_function=gradient) self._ret['loss'] = loss self._ret['params'] = self._discriminator.parameters return self._ret