Commit 233a479b authored by zpgeng's avatar zpgeng
Browse files

Add RNNDeep.py

parent 3ce71434
import keras
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Activation, Permute, Dropout, LSTM
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from tensorflow.keras.layers import SeparableConv2D, DepthwiseConv2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.constraints import max_norm
from sklearn import preprocessing
from config import config
from utils.utils import *
import os
from keras.callbacks import CSVLogger
import seaborn as sns
sns.set_style('darkgrid')
def run(trainX, trainY):
if config['split']:
config['model']=config['model']+'_cluster'
classifier = Classifier_DEEPEYE_LSTM(output_directory=config['root_dir'],
input_shape=config['deepeye-lstm']['input_shape'])
hist = classifier.fit(trainX, trainY)
plot_loss(hist, config['model_dir'], config['model'], True)
plot_acc(hist, config['model_dir'], config['model'], True)
save_logs(hist, config['model_dir'], config['model'], pytorch=False)
save_model_param(classifier.model, config['model_dir'], config['model'], pytorch=False)
class Classifier_DEEPEYE_LSTM:
"""
Inputs:
nb_classes : int, number of classes to classify
input_shape : shape of the input tensor, in our case: 129 * 500 * 1
use_bottleneck : use Bottleneck layer to select the most informative channels
use_residual : use a shortcut layer (RNN) to try to avoid vanishing gradient
kernel_size : 41
batch_size : 64
epochs : 1500
output_directory: directory where plot weights and results are stored
depth : 6, number of repetion of the inception module
Outputs:
y_pred : class (left/right for nb_class=2) of the given input_tensor
"""
def __init__(self, output_directory, input_shape, dropoutRate=0.4, verbose=True, build=True,
batch_size=64, nb_filters=32, use_residual=True, use_bottleneck=True, depth=6,
kernel_size=41, nb_epochs=300):
self.output_directory = output_directory
self.nb_filters = nb_filters
self.use_residual = use_residual
self.use_bottleneck = use_bottleneck
self.depth = depth
self.kernel_size = kernel_size - 1
self.callbacks = None
self.batch_size = batch_size
self.bottleneck_size = 32
self.nb_epochs = nb_epochs
self.dropoutRate = dropoutRate
self.momentum = 0.85
self.lr = 0.1
self.feature_nb = 32
if build:
# build model
if config['split']:
self.model = self.split_model(input_shape)
else:
self.model = self.build_model(input_shape)
if verbose:
self.model.summary()
print(20*'*')
print("Parameters are: Dropout rate:", self.dropoutRate, " Momentum:", self.momentum,
"Learning rate:", self.lr, "# of features:", self.feature_nb)
print(20*'*')
# self.model.save_weights(self.output_directory + 'model_init.hdf5')
def _LSTM_preprocessing(self, input_tensor, output_feature_nb):
lstm = Sequential()
lstm.add(LSTM(output_feature_nb, return_sequences=True))
lstm.add(Dropout(self.dropoutRate))
lstm.add(LSTM(output_feature_nb, return_sequences=True))
lstm.add(Dropout(self.dropoutRate))
lstm.add(keras.layers.BatchNormalization())
output_tensor = lstm(input_tensor)
return output_tensor
def _inception_module(self, input_tensor, stride=1, activation='linear'):
'''
Inception Network
Input:
input_tensor : input of size (129 * 500 * 1) to be forwarded
stride : 1
F1 : number of filters of the first convolution
kernLength : 25, second dimension of the kernel in the first convolution, the first dimension is 1
D : 2, depth multiplier
F1 : 8,
activation function : linear
Output:
output_tensor : input through the inception network
'''
if self.use_bottleneck and int(input_tensor.shape[-1]) > 1:
input_inception = keras.layers.Conv1D(filters=self.bottleneck_size, kernel_size=1,
padding='same', activation=activation, use_bias=False)(input_tensor)
else:
input_inception = input_tensor
# kernel_size_s = [3, 5, 8, 11, 17]
kernel_size_s = [self.kernel_size // (2 ** i) for i in range(3)]
conv_list = []
for i in range(len(kernel_size_s)):
conv_list.append(keras.layers.Conv1D(filters=self.nb_filters, kernel_size=kernel_size_s[i],
strides=stride, padding='same', activation=activation,
use_bias=False)(input_inception))
max_pool_1 = keras.layers.MaxPool1D(pool_size=3, strides=stride, padding='same')(input_tensor)
conv_6 = keras.layers.Conv1D(filters=self.nb_filters, kernel_size=1, padding='same', activation=activation,
use_bias=False)(max_pool_1)
conv_list.append(conv_6)
x = keras.layers.Concatenate(axis=2)(conv_list)
x = keras.layers.BatchNormalization()(x)
x = keras.layers.Activation(activation='relu')(x)
return x
def _shortcut_layer(self, input_tensor, out_tensor):
'''
implementation of a shortcut layer inspired by the Residual NN
'''
shortcut_y = keras.layers.Conv1D(filters=int(out_tensor.shape[-1]), kernel_size=1,
padding='same', use_bias=False)(input_tensor)
shortcut_y = keras.layers.BatchNormalization()(shortcut_y)
x = keras.layers.Add()([shortcut_y, out_tensor])
x = keras.layers.Activation('relu')(x)
return x
def build_model(self, input_shape):
input_layer = keras.layers.Input((input_shape[0], input_shape[1]))
lstm_tensor = self._LSTM_preprocessing(input_layer, self.feature_nb)
x = lstm_tensor
input_res = lstm_tensor
for d in range(self.depth):
x = self._inception_module(x)
if self.use_residual and d % 3 == 2:
x = self._shortcut_layer(input_res, x)
input_res = x
gap_layer = tf.keras.layers.GlobalAveragePooling1D()(x)
# Add Dropout layer
gap_layer = tf.keras.layers.Dropout(self.dropoutRate)(gap_layer)
output_layer = tf.keras.layers.Dense(1, activation='sigmoid')(gap_layer)
model = tf.keras.models.Model(inputs=input_layer, outputs=output_layer)
return model
def split_model(self, input_shape):
input_layer = keras.layers.Input((input_shape[0], input_shape[1]))
output=[]
# run inception over the cluster
for c in config['cluster'].keys():
output.append(self.build_model(input_shape = None, X = tf.expand_dims(tf.transpose(tf.nn.embedding_lookup(
tf.transpose(input_layer,(1,0,2)),config['cluster'][c]),(1,0,2)),axis=-1), c = c))
# append the results and perform 1 dense layer with last_channel dimension and the output layer
x = tf.keras.layers.Concatenate(axis=1)(output)
dense=tf.keras.layers.Dense(32, activation='relu')(x)
output_layer=tf.keras.layers.Dense(1, activation='sigmoid')(dense)
model = tf.keras.models.Model(inputs=input_layer, outputs=output_layer)
return model
def fit(self, lstm_x, y):
# Add early stopping and reduced learning rata to mitigate overfitting
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=15,
min_lr=0.0001)
early_stop = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=50)
csv_logger = CSVLogger(config['batches_log'], append=True, separator=';')
if self.batch_size is None:
mini_batch_size = int(min(deepeye_x.shape[0] / 10, 16))
else:
mini_batch_size = self.batch_size
self.model.compile(loss='binary_crossentropy', optimizer=keras.optimizers.Adam(learning_rate=self.lr), metrics=['accuracy'])
#self.model.compile(loss='binary_crossentropy', optimizer=keras.optimizers.SGD(learning_rate=self.lr,
#momentum=self.momentum, nesterov=True), metrics=['accuracy'])
#self.model.compile(loss='binary_crossentropy',
#optimizer=keras.optimizers.RMSprop(learning_rate=self.lr, momentum=self.momentum), metrics=['accuracy'])
hist = self.model.fit(lstm_x, y, batch_size=mini_batch_size, verbose=1, validation_split=0.25,
epochs=self.nb_epochs, shuffle=True, callbacks=[early_stop, csv_logger])
return hist
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment