Skip to content

Commit

Permalink
Fix issue-2.
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonWuxs committed Jan 13, 2023
1 parent 0a1a3b6 commit 2ff1cde
Showing 1 changed file with 45 additions and 49 deletions.
94 changes: 45 additions & 49 deletions forwardforward.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
def sigmoid(x):
return 1.0 / (1.0 + np.exp(-x))

def norm(x, p=2, dim=-1, keepdims=True):
return (x ** p).sum(axis=dim, keepdims=keepdims)


class FullyConnected:

def __init__(self, in_dim, out_dim, do_norm=True, do_goodness=False, bias=True, constant=0.0):
def __init__(self, in_dim, out_dim, do_norm=True, constant=1.0):
k = np.sqrt(1.0 / in_dim)
self._W = np.random.uniform(-k, k, (in_dim, out_dim))
self._b = np.random.uniform(-k, k, out_dim) if bias else np.zeros(out_dim)
self._b = np.random.uniform(-k, k, out_dim)
self._c = constant
self._gradW, self._gradb = 0.0, 0.0
self._theta = constant
self._do_normalize = do_norm
self._do_goodness = do_goodness
self._samples = 0.0
self.training = True

Expand All @@ -38,24 +40,25 @@ def _normalize(self, X):
hidden vector before using it as input to the
next layer.``
"""
return X / (1e-9 + (X ** 2).sum(axis=-1, keepdims=True) ** 0.5)
return X / (1e-9 + norm(X, keepdims=True) ** 0.5)

def _goodness(self, H):
def goodness(self, H):
"""compute the goodness score.
Math: \sum_{d=1}^D H_d
Ref: ``Let us suppose that the goodness function for a layer
is simply the sum of the squares of the activities of
the rectified linear neurons in that layer.``
"""
return (H ** 2).sum(axis=-1)
return norm(H)

def _backward(self, x, h):
self._samples += x.shape[0]
y = sigmoid(self._goodness(h) - self._theta)
grady = y * (1 - y)
gradh = 2 * grady.reshape(-1, 1) * h
self._gradW += x.T @ gradh # (indim, outdim)
self._gradb += gradh.sum(axis=0) # (outdim,)
if self.training:
self._samples += x.shape[0]
y = sigmoid(self.goodness(h) - self._c)
grady = y * (1 - y)
gradh = 2 * grady.reshape(-1, 1) * h
self._gradW += x.T @ gradh # (indim, outdim)
self._gradb += gradh.sum(axis=0) # (outdim,)

def _transform(self, X):
assert X.shape[-1] == self._W.shape[0]
Expand All @@ -65,10 +68,7 @@ def forward(self, X):
if self._do_normalize:
X = self._normalize(X)
h = self._transform(X)
if self.training:
self._backward(X, h)
if self._do_goodness:
return self._goodness(h)
self._backward(X, h)
return h

def update(self, positive, learn_rate):
Expand All @@ -80,26 +80,32 @@ def update(self, positive, learn_rate):


class ForwardForwardClassifier:

name = "ForwardForwardNetwork"

def __init__(self, in_dim, hide_dim, out_dim, n_layers=2):
assert n_layers >= 1
self._dims = (in_dim, hide_dim, out_dim)
self.layers = []
for layer in range(n_layers):
do_norm = layer > 0 # only the first layer does not require normalization
do_good = layer == n_layers - 1 # only the last layer should compute the goodness of the output
input_dim = in_dim + out_dim if layer == 0 else hide_dim
self.layers.append(FullyConnected(input_dim, hide_dim, do_norm, do_good))
self.layers = [FullyConnected(in_dim + out_dim, hide_dim, False)]
for layer in range(1, n_layers):
self.layers.append(FullyConnected(hide_dim, hide_dim, True))

def forward(self, X):
for layer in self.layers:
X = layer(X)
return X
return layer.goodness(X)

def update(self, positive, learn_rate):
def train_step(self, positive, negative, learn_rate, niters=5):
for layer in self.layers:
layer.update(positive, learn_rate)

def fit(self, X, Y, learn_rate=0.001, batch_size=32, epochs=1000, log_freq=100):
for niter in range(niters):
layer(positive)
layer.update(positive=True, learn_rate=learn_rate)
layer(negative)
layer.update(positive=False, learn_rate=learn_rate)
positive = layer(positive)
negative = layer(negative)

def fit(self, X, Y, learn_rate=5e-4, batch_size=32, epochs=10000, log_freq=1000):
import time
from sklearn.metrics import accuracy_score
Y = Y.astype(np.int32)
Expand All @@ -117,35 +123,33 @@ def fit(self, X, Y, learn_rate=0.001, batch_size=32, epochs=1000, log_freq=100):
for batch in self._generate_batches(subX, batch_size):
real_y = np.zeros((len(batch), len(ylist)))
real_y[:, y] = 1
real_batch = np.hstack([batch, real_y])
self.forward(real_batch)
self.update(positive=True, learn_rate=learn_rate)

fake_y = np.zeros((len(batch), len(ylist)))
fake_y[range(len(batch)), random.choices(subY, k=len(batch))] = 1
fake_batch = np.hstack([batch, fake_y])
self.forward(fake_batch)
self.update(positive=False, learn_rate=learn_rate)
self.train_step(positive=np.hstack([batch, real_y]),
negative=np.hstack([batch, fake_y]),
learn_rate=learn_rate)

self.training = False
Yhat = self.predict(X, batch_size)
if epoch % log_freq == 0:
acc = accuracy_score(Y, Yhat)
print("Epoch-%d | Spent=%.4f | Train Accuracy=%.4f" % (epoch, time.time() - begin, acc))
begin = time.time()
def predict(self, X, batch_size=32):

def predict_proba(self, X, batch_size=32):
Yhat = []
for batch in self._generate_batches(X, batch_size):
batch_yhat = []
for y in self._labels:
temp_y = np.zeros((len(batch), len(self._labels)))
temp_y[:, y] = 1
temp_x = np.hstack([batch, temp_y])
pred_y = self.forward(temp_x)
pred_y = self.forward(np.hstack([batch, temp_y]))
batch_yhat.append(pred_y.reshape(-1, 1))
Yhat.extend(np.argmax(np.hstack(batch_yhat), -1))
return np.array(Yhat)
Yhat.append(np.hstack(batch_yhat))
return np.vstack(Yhat)

def predict(self, X, batch_size=32):
return np.argmax(self.predict_proba(X, batch_size), -1)

def _generate_batches(self, X, batch_size=32):
batch = []
Expand All @@ -157,11 +161,3 @@ def _generate_batches(self, X, batch_size=32):
if len(batch) > 0:
yield np.vstack(batch)


if __name__ == "__main__":
X = np.random.normal(size=(100, 5))
Y = X[:, 0] ** 5 + X[:, 1] * 4 + X[:, 2] ** -8 + X[:, 3] * -0.5 + X[:,4] * 0.0
Y = np.where(Y >= 0, 1, 0)
net = ForwardForwardClassifier(5, 100, 2, n_layers=2)
net.fit(X, Y)

0 comments on commit 2ff1cde

Please sign in to comment.