ma_cisco_malware/models/pauls_networks.py

94 lines
3.5 KiB
Python

from collections import namedtuple
import keras
from keras.engine import Input, Model as KerasModel
from keras.layers import Activation, Conv1D, Dense, Dropout, Embedding, GlobalMaxPooling1D, TimeDistributed
from keras.regularizers import l2
import dataset
Model = namedtuple("Model", ["in_domains", "in_flows", "out_client", "out_server"])
best_config = {
"type": "paul",
"batch_size": 64,
"window_size": 10,
"domain_length": 40,
"flow_features": 3,
#
'dropout': 0.5,
'domain_features': 32,
'drop_out': 0.5,
'embedding_size': 64,
'filter_main': 512,
'flow_features': 3,
'dense_main': 32,
'filter_embedding': 32,
'hidden_embedding': 32,
'kernel_embedding': 8,
'kernels_main': 8,
'input_length': 40
}
def get_embedding(embedding_size, input_length, filter_size, kernel_size, hidden_dims, drop_out=0.5) -> KerasModel:
x = y = Input(shape=(input_length,))
y = Embedding(input_dim=dataset.get_vocab_size(), output_dim=embedding_size)(y)
y = Conv1D(filter_size,
kernel_size,
activation='relu')(y)
y = GlobalMaxPooling1D()(y)
y = Dropout(drop_out)(y)
y = Dense(hidden_dims)(y)
y = Activation('relu')(y)
return KerasModel(x, y)
def get_model(cnnDropout, flow_features, domain_features, window_size, domain_length, cnn_dims, kernel_size,
dense_dim, cnn, model_output="both") -> Model:
ipt_domains = Input(shape=(window_size, domain_length), name="ipt_domains")
encoded = TimeDistributed(cnn, name="domain_cnn")(ipt_domains)
ipt_flows = Input(shape=(window_size, flow_features), name="ipt_flows")
merged = keras.layers.concatenate([encoded, ipt_flows], -1)
# CNN processing a small slides of flow windows
y = Conv1D(cnn_dims,
kernel_size,
activation='relu',
input_shape=(window_size, domain_features + flow_features))(merged)
# remove temporal dimension by global max pooling
y = GlobalMaxPooling1D()(y)
y = Dropout(cnnDropout)(y)
y = Dense(dense_dim, kernel_regularizer=l2(0.1), activation='relu')(y)
out_client = Dense(1, activation='sigmoid', name="client")(y)
out_server = Dense(1, activation='sigmoid', name="server")(y)
return Model(ipt_domains, ipt_flows, out_client, out_server)
def get_new_model(dropout, flow_features, domain_features, window_size, domain_length, cnn_dims, kernel_size,
dense_dim, cnn, model_output="both") -> Model:
ipt_domains = Input(shape=(window_size, domain_length), name="ipt_domains")
ipt_flows = Input(shape=(window_size, flow_features), name="ipt_flows")
encoded = TimeDistributed(cnn, name="domain_cnn")(ipt_domains)
merged = keras.layers.concatenate([encoded, ipt_flows], -1)
y = Dense(dense_dim,
activation="relu",
name="dense_server")(merged)
out_server = Dense(1, activation="sigmoid", name="server")(y)
merged = keras.layers.concatenate([merged, y], -1)
# CNN processing a small slides of flow windows
y = Conv1D(cnn_dims,
kernel_size,
activation='relu',
input_shape=(window_size, domain_features + flow_features))(merged)
# remove temporal dimension by global max pooling
y = GlobalMaxPooling1D()(y)
y = Dropout(dropout)(y)
y = Dense(dense_dim,
activation='relu',
name="dense_client")(y)
out_client = Dense(1, activation='sigmoid', name="client")(y)
return Model(ipt_domains, ipt_flows, out_client, out_server)