Commit 0af6ee4e authored by Ard Kastrati's avatar Ard Kastrati
Browse files

Refactored the code, removed duplicated code, documented the code

parent 907806f7
# -*- coding: utf-8 -*-
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
import logging
from CNN.split_cnn import *
class Net(nn.Module):
def __init__(self,cluster_split):
super(Net, self).__init__()
self.cluster_split=cluster_split
self.conv1 = nn.Conv1d(in_channels=129, out_channels=258, kernel_size=5, stride=1, padding=2)
self.pool = nn.MaxPool1d(5)
self.conv2 = nn.Conv1d(in_channels=258, out_channels=64, kernel_size=5, stride=1, padding=2)
self.fc1 = nn.Linear(8*9*20, 120)
self.fc2 = nn.Linear(120, 60)
self.fc3 = nn.Linear(60, 2)
self.split_conv1=SplitCNN(self.cluster_split,out_channels=2*self.cluster_split,kernel_size=5,stride=1,padding=2)
self.split_conv2=SplitCNN(self.cluster_split,out_channels=8*np.ones_like(self.cluster_index),kernel_size=5,stride=1,padding=2)
def forward(self, x):
x = self.pool(F.relu(self.split_conv1.forward(x)))
self.cluster_split=self.split_conv1.channels
x = self.pool(F.relu(self.split_conv2.forward(x)))
self.cluster_split=self.split_conv2.channels
x = x.view(-1, np.sum(self.cluster_split)*20)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def run(trainX, trainY):
#load the data
dataset = torch.utils.data.TensorDataset(trainX, trainY)
trainloader = torch.utils.data.DataLoader(dataset, batch_size=2)
cluster_index,cluster_split=cluster()
# define the network
net = Net(cluster_split)
# define the optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)
# train
train(trainloader=trainloader, net=net, optimizer=optimizer, criterion=criterion,cluster_index=cluster_index)
# save our trained model
PATH = './cifar_net.pth'
torch.save(net.state_dict(), PATH)
def train(trainloader, net, optimizer, criterion,cluster_index, epoch=50):
for epoch in range(2): # loop over the dataset multiple times
running_loss = 0.0
for i, data in enumerate(trainloader, 0):
# get the inputs; data is a list of [inputs, labels]
inputs, labels = data
inputs=inputs[cluster_index,...]
# zero the parameter gradients
optimizer.zero_grad()
# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels.squeeze(1))
loss.backward()
optimizer.step()
# logging.info statistics
running_loss += loss.item()
if i % 200 == 0: # logging.info every 2000 mini-batches
logging.info('[%d, %5d] loss: %.3f' %
(epoch + 1, i + 1, running_loss / 200))
running_loss = 0.0
logging.info('Finished Training')
import tensorflow as tf
import tensorflow.keras as keras
from config import config
from utils.utils import *
import logging
from keras.callbacks import CSVLogger
from ConvNet import ConvNet
from tensorflow.keras.constraints import max_norm
def run(trainX, trainY):
"""
Starts the CNN and stores the histogram, the plots of loss and accuracy.
"""
logging.info("Starting CNN.")
classifier = Classifier_CNN(input_shape=config['cnn']['input_shape'])
hist = classifier.fit(trainX, trainY)
plot_loss(hist, config['model_dir'], config['model'], True)
plot_acc(hist, config['model_dir'], config['model'], True)
save_logs(hist, config['model_dir'], config['model'], pytorch=False)
# save_model_param(classifier.model, config['model_dir'], config['model'], pytorch=False)
class Classifier_CNN:
def __init__(self, input_shape, verbose=True, build=True, batch_size=64, nb_filters=32,
use_residual=True, depth=6, kernel_size=40, nb_epochs=1500):
self.nb_filters = nb_filters
self.use_residual = use_residual
self.depth = depth
self.kernel_size = kernel_size
self.callbacks = None
self.batch_size = batch_size
self.bottleneck_size = 32
self.nb_epochs = nb_epochs
self.verbose = verbose
if build:
if config['split']:
self.model = self.split_model(input_shape)
else:
self.model = self._build_model(input_shape)
if self.verbose:
self.model.summary()
# self.model.save_weights(self.output_directory + 'model_init.hdf5')
def split_model(self, input_shape):
input_layer = tf.keras.layers.Input(input_shape)
output = []
# run CNN over the cluster
for c in config['cluster'].keys():
a = [input_shape[0]]
a.append(len(config['cluster'][c]))
input_shape = tuple(a)
class Classifier_CNN(ConvNet):
"""
The Classifier_CNN is one of the simplest classifiers. It implements the class ConvNet, which is made of modules with a specific depth.
"""
output.append(self._build_model(input_shape,
X=tf.transpose(tf.nn.embedding_lookup(tf.transpose(input_layer),
config['cluster'][c]))))
def _module(self, input_tensor, current_depth):
"""
The module of CNN is made of a simple convolution with batch normalization and ReLu activation. Finally, MaxPooling is also used.
"""
# append the results and perform 1 dense layer with last_channel dimension and the output layer
x = tf.keras.layers.Concatenate()(output)
dense = tf.keras.layers.Dense(32, activation='relu')(x)
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(dense)
model = tf.keras.models.Model(inputs=input_layer, outputs=output_layer)
model.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(), metrics=['accuracy'])
return model
def _CNN_module(self, input_tensor, nb_filters=128, kernel_size=5, activation='linear'):
x = tf.keras.layers.Conv1D(filters=nb_filters, kernel_size=kernel_size, padding='same', activation=activation, use_bias=False)(input_tensor)
x = tf.keras.layers.Conv1D(filters=self.nb_filters, kernel_size=self.kernel_size, padding='same', use_bias=False)(input_tensor)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Activation(activation='relu')(x)
x = tf.keras.layers.MaxPool1D()(x)
return x
def _build_model(self, input_shape, X=[], depth=3):
if config['split']:
input_layer = X
else:
input_layer = tf.keras.layers.Input(input_shape)
x = input_layer
for d in range(depth):
x = self._CNN_module(x, nb_filters=32*(d+1))
gap_layer = tf.keras.layers.GlobalAveragePooling1D()(x)
if config['split']:
return gap_layer
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(gap_layer)
model = tf.keras.models.Model(inputs=input_layer, outputs=output_layer)
model.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(), metrics=['accuracy'])
return model
def fit(self, CNN_x, y):
csv_logger = CSVLogger(config['batches_log'], append=True, separator=';')
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=20)
ckpt_dir = config['model_dir'] + '/' + config['model'] + '_' + 'best_model.h5'
ckpt = tf.keras.callbacks.ModelCheckpoint(ckpt_dir, verbose=1, monitor='val_accuracy', save_best_only=True,
mode='auto')
hist = self.model.fit(CNN_x, y, verbose=1, validation_split=0.2, epochs=35,
callbacks=[csv_logger, ckpt, early_stop])
return hist
\ No newline at end of file
return x
\ No newline at end of file
from torch import nn
import numpy as np
cluster_split=[14,11,11,12,13,12,11,11,15]
c1=[12,5,4,11,19,18,16,16,15,9,14,17,21,22]
c2=[8,1,2,122,121,116,123,3,124,117,118]
c3=[114,115,109,108,102,98,103,110,111,104,93]
c4=[100,101,97,96,95,89,90,84,91,85,92,86]
c5=[82,74,70,75,83,76,71,67,72,77,78,62,61]
c6=[69,65,64,58,57,50,51,59,66,60,52,53]
c7=[44,39,40,45,46,47,41,35,29,36,42]
c8=[38,33,32,25,26,23,27,34,28,24,20]
c9=[13,6,112,105,106,7,30,37,31,129,80,87,79,55,54]
cluster={}
cluster['c1']=c1
cluster['c2']=c2
cluster['c3']=c3
cluster['c4']=c4
cluster['c5']=c5
cluster['c6']=c6
cluster['c7']=c7
cluster['c8']=c8
cluster['c9']=c9
cluster_index=[]
for i in range (1,10):
cluster_index+=cluster['c'+str(i)]
#reorganise the electrode according to the cluster
def cluster():
return cluster_index,cluster_split
class SplitCNN(nn.Module):
def __init__(self,cluster_split,out_channels,kernel_size,stride,padding):
super(SplitCNN, self).__init__()
self.channels = out_channels
self.padding= padding
self.kernel = kernel_size
self.split = cluster_split
self.stride = stride
self.cnn = nn.ModuleList()
for i in range(len(self.split)):
self.cnn.append(nn.conv1d(cluster_split[i],self.channels[i],
self.kernel,self.stride,self.padding))
def forward(self, x):
x = x.split(self.split,0)
x = [self.cnn[j](x[j]) for j in range(len(self.split))]
return torch.cat(x,0)
from abc import ABC, abstractmethod
import tensorflow as tf
import numpy as np
import tensorflow.keras as keras
from config import config
from utils.utils import *
import logging
from keras.callbacks import CSVLogger
def run(trainX, trainY):
logging.info("Starting XceptionTime.")
classifier = Classifier_Xception(input_shape=config['xception']['input_shape'])
hist = classifier.fit(trainX, trainY)
plot_loss(hist, config['model_dir'], config['model'], True)
plot_acc(hist, config['model_dir'], config['model'], True)
save_logs(hist, config['model_dir'], config['model'], pytorch=False)
#save_model_param(classifier.model, config['model_dir'], config['model'], pytorch=False)
class Classifier_Xception:
def __init__(self, input_shape, verbose=True, build=True, batch_size=64, nb_filters=32,
use_residual=True, depth=6, nb_epochs=1500):
self.nb_filters = nb_filters
class ConvNet(ABC):
def __init__(self, input_shape, kernel_size=32, nb_filters=32, verbose=True, batch_size=64, use_residual=False, depth=6, preprocessing = False):
self.use_residual = use_residual
self.depth = depth
self.callbacks = None
self.batch_size = batch_size
self.nb_epochs = nb_epochs
self.verbose = verbose
self.kernel_size = kernel_size
self.nb_filters = nb_filters
self.preprocessing = preprocessing
self.input_shape = input_shape
if build:
self.model = self._build_model(input_shape)
if self.verbose:
self.model.summary()
if config['split']:
self.model = self._split_model()
else:
self.model = self._build_model()
if self.verbose:
self.model.summary()
def _xception_module(self, input_tensor, nb_filters=128, activation='linear'):
x = tf.keras.layers.SeparableConv1D(filters=nb_filters, kernel_size=32, padding='same', activation=activation, use_bias=False, depth_multiplier=1)(input_tensor)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.Activation(activation='relu')(x)
return x
def _split_model(self):
input_layer = keras.layers.Input(self.input_shape)
output = []
for c in config['cluster'].keys():
a = [self.input_shape[0]]
a.append(len(config['cluster'][c]))
input_shape = tuple(a)
output.append(self._build_model(X=tf.transpose(tf.nn.embedding_lookup(tf.transpose(input_layer), config['cluster'][c]))))
# append the results and perform 1 dense layer with last_channel dimension and the output layer
x = tf.keras.layers.Concatenate()(output)
dense = tf.keras.layers.Dense(32, activation='relu')(x)
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(dense)
model = tf.keras.models.Model(inputs=input_layer, outputs=output_layer)
model.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(), metrics=['accuracy'])
return model
# abstract method
def _preprocessing(self, input_tensor):
pass
# abstract method
def _module(self, input_tensor, current_depth):
pass
def _shortcut_layer(self, input_tensor, out_tensor):
shortcut_y = tf.keras.layers.Conv1D(filters=int(out_tensor.shape[-1]), kernel_size=1, padding='same', use_bias=False)(input_tensor)
......@@ -48,30 +58,38 @@ class Classifier_Xception:
x = keras.layers.Activation('relu')(x)
return x
def _build_model(self, input_shape, use_residual=True, depth=9):
input_layer = tf.keras.layers.Input(input_shape)
x = input_layer
input_res = input_layer
def _build_model(self, X=[]):
if config['split']:
input_layer = X
else:
input_layer = tf.keras.layers.Input(self.input_shape)
for d in range(depth):
x = self._xception_module(x)
if self.preprocessing:
preprocessed = self._preprocessing(input_layer)
x = preprocessed
input_res = preprocessed
else:
x = input_layer
input_res = input_layer
if use_residual and d % 3 == 2:
for d in range(self.depth):
x = self._module(x, d)
if self.use_residual and d % 3 == 2:
x = self._shortcut_layer(input_res, x)
input_res = x
gap_layer = tf.keras.layers.GlobalAveragePooling1D()(x)
if config['split']:
return gap_layer
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(gap_layer)
model = tf.keras.models.Model(inputs=input_layer, outputs=output_layer)
model.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(), metrics=['accuracy'])
return model
def fit(self, xception_x, y):
def fit(self, x, y):
csv_logger = CSVLogger(config['batches_log'], append=True, separator=';')
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=20)
ckpt_dir = config['model_dir'] + '/' + config['model'] + '_' + 'best_model.h5'
ckpt = tf.keras.callbacks.ModelCheckpoint(ckpt_dir, verbose=1, monitor='val_accuracy', save_best_only=True, mode='auto')
hist = self.model.fit(xception_x, y, verbose=1, validation_split=0.2, epochs=35,
callbacks=[csv_logger, ckpt, early_stop])
return hist
hist = self.model.fit(x, y, verbose=1, batch_size=self.batch_size, validation_split=0.2, epochs=1, callbacks=[csv_logger, ckpt, early_stop])
return hist
\ No newline at end of file
import keras
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Activation, Permute, Dropout, LSTM
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from tensorflow.keras.layers import SeparableConv2D, DepthwiseConv2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.constraints import max_norm
from sklearn import preprocessing
from config import config
from utils.utils import *
import os
from keras.callbacks import CSVLogger
import seaborn as sns
sns.set_style('darkgrid')
def run(trainX, trainY):
classifier = Classifier_DEEPEYE_LSTM(output_directory=config['root_dir'],
input_shape=config['deepeye-lstm']['input_shape'])
hist = classifier.fit(trainX, trainY)
plot_loss(config['model_dir'], hist, config['model'], True)
plot_acc(config['model_dir'], hist, config['model'], True)
# Newly added lines below
save_logs(hist, config['model_dir'], config['model'], pytorch=False)
#save_model_param(classifier.model, config['model_dir'], config['model'], pytorch=False)
class Classifier_DEEPEYE_LSTM:
"""
Inputs:
nb_classes : int, number of classes to classify
input_shape : shape of the input tensor, in our case: 129 * 500 * 1
use_bottleneck : use Bottleneck layer to select the most informative channels
use_residual : use a shortcut layer (RNN) to try to avoid vanishing gradient
kernel_size : 41
batch_size : 64
epochs : 1500
output_directory: directory where plot weights and results are stored
depth : 6, number of repetion of the inception module
Outputs:
y_pred : class (left/right for nb_class=2) of the given input_tensor
"""
def __init__(self, output_directory, input_shape, dropoutRate=0.25, verbose=True, build=True,
batch_size=64, nb_filters=64, use_residual=True, use_bottleneck=True, depth=6,
kernel_size=41, nb_epochs=300):
self.output_directory = output_directory
self.nb_filters = nb_filters
self.use_residual = use_residual
self.use_bottleneck = use_bottleneck
self.depth = depth
self.kernel_size = kernel_size - 1
self.callbacks = None
self.batch_size = batch_size
self.bottleneck_size = 32
self.nb_epochs = nb_epochs
self.dropoutRate = dropoutRate
self.momentum = 0.9
self.lr = 0.001
self.feature_nb = 75
if build:
# build mode
if config['split']:
self.model = self.split_model(input_shape)
else:
self.model = self.build_model(input_shape)
if verbose:
self.model.summary()
print(20*'*')
print("Parameters are: Dropout rate:", self.dropoutRate, " Momentum:", self.momentum,
"Learning rate:", self.lr, "# of features:", self.feature_nb, "# of filters:", self.nb_filters)
print(20*'*')
self.verbose = verbose
def split_model(self,input_shape):
input_layer = keras.layers.Input((input_shape[0], input_shape[1]))
output=[]
# run inception over the cluster
for c in config['cluster'].keys():
output.append(self.build_model(input_shape = None, X = tf.transpose(tf.nn.embedding_lookup(
tf.transpose(input_layer,(1,0,2)),config['cluster'][c]),(1,0,2))))
# append the results and perform 1 dense layer with last_channel dimension and the output layer
x = tf.keras.layers.Concatenate(axis=1)(output)
dense=tf.keras.layers.Dense(32, activation='relu')(x)
output_layer=tf.keras.layers.Dense(1,activation='sigmoid')(dense)
model = tf.keras.models.Model(inputs=input_layer, outputs=output_layer)
return model
def _LSTM_preprocessing(self, input_tensor, output_feature_nb):
lstm = Sequential()
lstm.add(LSTM(output_feature_nb, return_sequences=True))
lstm.add(Dropout(self.dropoutRate))
lstm.add(LSTM(output_feature_nb, return_sequences=True))
lstm.add(Dropout(self.dropoutRate))
lstm.add(keras.layers.BatchNormalization())
output_tensor = lstm(input_tensor)
return output_tensor
def _inception_module(self, input_tensor, stride=1, activation='linear'):
'''
Inception Network
Input:
input_tensor : input of size (129 * 500 * 1) to be forwarded
stride : 1
F1 : number of filters of the first convolution
kernLength : 25, second dimension of the kernel in the first convolution, the first dimension is 1
D : 2, depth multiplier
F1 : 8,
activation function : linear
Output:
output_tensor : input through the inception network
'''
if self.use_bottleneck and int(input_tensor.shape[-1]) > 1:
input_inception = keras.layers.Conv1D(filters=self.bottleneck_size, kernel_size=1,
padding='same', activation=activation, use_bias=False)(input_tensor)
else:
input_inception = input_tensor
# kernel_size_s = [3, 5, 8, 11, 17]
kernel_size_s = [self.kernel_size // (2 ** i) for i in range(3)]
conv_list = []
for i in range(len(kernel_size_s)):
conv_list.append(keras.layers.Conv1D(filters=self.nb_filters, kernel_size=kernel_size_s[i],
strides=stride, padding='same', activation=activation,
use_bias=False)(input_inception))
max_pool_1 = keras.layers.MaxPool1D(pool_size=3, strides=stride, padding='same')(input_tensor)
conv_6 = keras.layers.Conv1D(filters=self.nb_filters, kernel_size=1, padding='same', activation=activation,
use_bias=False)(max_pool_1)
conv_list.append(conv_6)
x = keras.layers.Concatenate(axis=2)(conv_list)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Activation(activation='relu')(x)
return x
def _shortcut_layer(self, input_tensor, out_tensor):
'''
implementation of a shortcut layer inspired by the Residual NN
'''
shortcut_y = keras.layers.Conv1D(filters=int(out_tensor.shape[-1]), kernel_size=1,
padding='same', use_bias=False)(input_tensor)
shortcut_y = keras.layers.BatchNormalization()(shortcut_y)
x = keras.layers.Add()([shortcut_y, out_tensor])
x = keras.layers.Activation('relu')(x)
return x
def build_model(self, input_shape, X = None):
if config['split']:
input_layer = X
else:
input_layer = keras.layers.Input((input_shape[0], input_shape[1]))
lstm_tensor = self._LSTM_preprocessing(input_layer, self.feature_nb)
x = lstm_tensor
input_res = lstm_tensor
for d in range(self.depth):
x = self._inception_module(x)
if self.use_residual and d % 3 == 2:
x = self._shortcut_layer(input_res, x)
input_res = x
gap_layer = tf.keras.layers.GlobalAveragePooling1D()(x)
# Add Dropout layer
gap_layer = tf.keras.layers.Dropout(self.dropoutRate)(gap_layer)
if config['split']:
return gap_layer
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(gap_layer)
model = tf.keras.models.Model(inputs=input_layer, outputs=output_layer)
return model
def fit(self, lstm_x, y):
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=50)
csv_logger = CSVLogger(config['batches_log'], append=True, separator=';')
ckpt_dir = config['model_dir'] + '/' + config['model'] + '_' + 'best_model.h5'
ckpt = tf.keras.callbacks.ModelCheckpoint(ckpt_dir, verbose=1, monitor='val_accuracy', save_best_only=True, mode='auto')
if self.batch_size is None:
mini_batch_size = int(min(deepeye_x.shape[0] / 10, 16))