ma_cisco_malware/dataset.py

234 lines
8.4 KiB
Python

# -*- coding: utf-8 -*-
import logging
import string
from multiprocessing import Pool
import h5py
import numpy as np
import pandas as pd
from keras.utils import np_utils
from tqdm import tqdm
logger = logging.getLogger('logger')
chars = dict((char, idx + 1) for (idx, char) in
enumerate(string.ascii_lowercase + string.punctuation + string.digits))
def get_character_dict():
return chars
def encode_char(c):
if c in chars:
return chars[c]
else:
return 0
encode_char = np.vectorize(encode_char)
# TODO: refactor
def get_user_chunks(user_flow, window=10):
# TODO: what is maxLengthInSeconds for?!?
# maxMilliSeconds = maxLengthInSeconds * 1000
# domains = []
# flows = []
# if not overlapping:
# numBlocks = int(np.ceil(len(user_flow) / window))
# userIDs = np.arange(len(user_flow))
# for blockID in np.arange(numBlocks):
# curIDs = userIDs[(blockID * window):((blockID + 1) * window)]
# useData = user_flow.iloc[curIDs]
# curDomains = useData['domain']
# if maxLengthInSeconds != -1:
# curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds
# underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds)
# if len(underTimeOutIDs) != len(curIDs):
# curIDs = curIDs[underTimeOutIDs]
# useData = user_flow.iloc[curIDs]
# curDomains = useData['domain']
# domains.append(list(curDomains))
# flows.append(useData)
# else:
# numBlocks = len(user_flow) + 1 - window
# userIDs = np.arange(len(user_flow))
# for blockID in np.arange(numBlocks):
# curIDs = userIDs[blockID:blockID + window]
# useData = user_flow.iloc[curIDs]
# curDomains = useData['domain']
# if maxLengthInSeconds != -1:
# curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds
# underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds)
# if len(underTimeOutIDs) != len(curIDs):
# curIDs = curIDs[underTimeOutIDs]
# useData = user_flow.iloc[curIDs]
# curDomains = useData['domain']
# domains.append(list(curDomains))
# flows.append(useData)
# if domains and len(domains[-1]) != window:
# domains.pop(-1)
# flows.pop(-1)
# return domains, flows
result = []
chunk_size = (len(user_flow) // window)
for i in range(chunk_size):
result.append(user_flow.iloc[i * window:(i + 1) * window])
if result and len(result[-1]) != window:
result.pop()
return result
def get_domain_features(domain, vocab: dict, max_length=40):
encoding = np.zeros((max_length,))
for j in range(min(len(domain), max_length)):
char = domain[-j] # TODO: why -j -> order reversed for domain url?
encoding[j] = vocab.get(char, 0)
return encoding
def get_all_flow_features(features):
flows = np.stack(
map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features)
)
return np.log1p(flows)
def create_dataset_from_flows(user_flow_df, char_dict, max_len, window_size=10):
logger.info("get chunks from user data frames")
with Pool() as pool:
results = []
for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())):
results.append(pool.apply_async(get_user_chunks, (user_flow, window_size)))
windows = [window for res in results for window in res.get()]
logger.info("create training dataset")
domain_tr, flow_tr, hits_tr, _, server_tr, trusted_hits_tr = create_dataset_from_lists(chunks=windows,
vocab=char_dict,
max_len=max_len)
# make client labels discrete with 4 different values
hits_tr = np.apply_along_axis(lambda x: discretize_label(x, 3), 0, np.atleast_2d(hits_tr))
# select only 1.0 and 0.0 from training data
pos_idx = np.where(np.logical_or(hits_tr == 1.0, trusted_hits_tr >= 1.0))[0]
neg_idx = np.where(hits_tr == 0.0)[0]
idx = np.concatenate((pos_idx, neg_idx))
# choose selected sample to train on
domain_tr = domain_tr[idx]
flow_tr = flow_tr[idx]
client_tr = np.zeros_like(idx, float)
client_tr[:pos_idx.shape[-1]] = 1.0
server_tr = server_tr[idx]
client_tr = np_utils.to_categorical(client_tr, 2)
server_tr = np_utils.to_categorical(server_tr, 2)
return domain_tr, flow_tr, client_tr, server_tr
def store_h5dataset(path, domain_tr, flow_tr, client_tr, server_tr):
f = h5py.File(path, "w")
domain_tr = domain_tr.astype(np.int8)
f.create_dataset("domain", data=domain_tr)
f.create_dataset("flow", data=flow_tr)
server_tr = server_tr.astype(np.bool)
client_tr = client_tr.astype(np.bool)
f.create_dataset("client", data=client_tr)
f.create_dataset("server", data=server_tr)
f.close()
def load_h5dataset(path):
data = h5py.File(path, "r")
return data["domain"], data["flow"], data["client"], data["server"]
def create_dataset_from_lists(chunks, vocab, max_len):
"""
combines domain and feature windows to sequential training data
:param chunks: list of flow feature windows
:param vocab:
:param max_len:
:return:
"""
def get_domain_features_reduced(d):
return get_domain_features(d[0], vocab, max_len)
logger.info(" compute domain features")
domain_features = []
for ds in tqdm(map(lambda f: f.domain, chunks)):
# TODO: fix this correctly
# assert min(np.atleast_3d(ds).shape) > 0, f"shape of 0 for {ds}"
if not ds: continue
domain_features.append(np.apply_along_axis(get_domain_features_reduced, 2, np.atleast_3d(ds)))
domain_features = np.concatenate(domain_features, 0)
logger.info(" compute flow features")
flow_features = get_all_flow_features(chunks)
logger.info(" select hits")
hits = np.max(np.stack(map(lambda f: f.virusTotalHits, chunks)), axis=1)
logger.info(" select names")
names = np.unique(np.stack(map(lambda f: f.user_hash, chunks)))
logger.info(" select servers")
servers = np.max(np.stack(map(lambda f: f.serverLabel, chunks)), axis=1)
logger.info(" select trusted hits")
trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, chunks)), axis=1)
return (domain_features, flow_features,
hits, names, servers, trusted_hits)
def discretize_label(values, threshold):
max_val = np.max(values)
if max_val >= threshold:
return 1.0
elif max_val == -1:
return -1.0
elif 0 < max_val < threshold:
return -2.0
else:
return 0.0
def get_user_flow_data(csv_file):
types = {
"duration": int,
"bytes_down": int,
"bytes_up": int,
"domain": object,
"timeStamp": float,
"server_ip": object,
"user_hash": float,
"virusTotalHits": int,
"serverLabel": int,
"trustedHits": int
}
df = pd.read_csv(csv_file)
df = df[list(types.keys())]
df.set_index(keys=['user_hash'], drop=False, inplace=True)
return df
def get_flow_per_user(df):
users = df['user_hash'].unique().tolist()
for user in users:
yield df.loc[df.user_hash == user].dropna(axis=0, how="any")
def load_or_generate_h5data(h5data, train_data, domain_length, window_size):
char_dict = get_character_dict()
logger.info(f"check for h5data {h5data}")
try:
open(h5data, "r")
raise FileNotFoundError
except FileNotFoundError:
logger.info("h5 data not found - load csv file")
user_flow_df = get_user_flow_data(train_data)
logger.info("create training dataset")
domain_tr, flow_tr, client_tr, server_tr = create_dataset_from_flows(user_flow_df, char_dict,
max_len=domain_length,
window_size=window_size)
logger.info("store training dataset as h5 file")
store_h5dataset(h5data, domain_tr, flow_tr, client_tr, server_tr)
logger.info("load h5 dataset")
return load_h5dataset(h5data)