# -*- coding: utf-8 -*- import logging import string from multiprocessing import Pool import h5py import numpy as np import pandas as pd from keras.utils import np_utils from tqdm import tqdm logger = logging.getLogger('logger') chars = dict((char, idx + 1) for (idx, char) in enumerate(string.ascii_lowercase + string.punctuation + string.digits)) def get_character_dict(): return chars def encode_char(c): if c in chars: return chars[c] else: return 0 encode_char = np.vectorize(encode_char) # TODO: ask for correct refactoring def get_user_chunks(user_flow, window=10): # TODO: what is maxLengthInSeconds for?!? # maxMilliSeconds = maxLengthInSeconds * 1000 # domains = [] # flows = [] # if not overlapping: # numBlocks = int(np.ceil(len(user_flow) / window)) # userIDs = np.arange(len(user_flow)) # for blockID in np.arange(numBlocks): # curIDs = userIDs[(blockID * window):((blockID + 1) * window)] # useData = user_flow.iloc[curIDs] # curDomains = useData['domain'] # if maxLengthInSeconds != -1: # curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds # underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) # if len(underTimeOutIDs) != len(curIDs): # curIDs = curIDs[underTimeOutIDs] # useData = user_flow.iloc[curIDs] # curDomains = useData['domain'] # domains.append(list(curDomains)) # flows.append(useData) # else: # numBlocks = len(user_flow) + 1 - window # userIDs = np.arange(len(user_flow)) # for blockID in np.arange(numBlocks): # curIDs = userIDs[blockID:blockID + window] # useData = user_flow.iloc[curIDs] # curDomains = useData['domain'] # if maxLengthInSeconds != -1: # curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds # underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) # if len(underTimeOutIDs) != len(curIDs): # curIDs = curIDs[underTimeOutIDs] # useData = user_flow.iloc[curIDs] # curDomains = useData['domain'] # domains.append(list(curDomains)) # flows.append(useData) # if domains and len(domains[-1]) != window: # domains.pop(-1) # flows.pop(-1) # return domains, flows result = [] chunk_size = (len(user_flow) // window) for i in range(chunk_size): result.append(user_flow.iloc[i * window:(i + 1) * window]) if result and len(result[-1]) != window: result.pop() return result def get_domain_features(domain, vocab: dict, max_length=40): encoding = np.zeros((max_length,)) for j in range(min(len(domain), max_length)): char = domain[-j] # TODO: why -j -> order reversed for domain url? encoding[j] = vocab.get(char, 0) return encoding def get_all_flow_features(features): flows = np.stack( map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features) ) return np.log1p(flows) def create_dataset_from_flows(user_flow_df, char_dict, max_len, window_size=10): logger.info("get chunks from user data frames") with Pool() as pool: results = [] for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())): results.append(pool.apply_async(get_user_chunks, (user_flow, window_size))) windows = [window for res in results for window in res.get()] logger.info("create training dataset") domain_tr, flow_tr, hits_tr, _, server_tr, trusted_hits_tr = create_dataset_from_lists(chunks=windows, vocab=char_dict, max_len=max_len) # make client labels discrete with 4 different values hits_tr = np.apply_along_axis(lambda x: discretize_label(x, 3), 0, np.atleast_2d(hits_tr)) # select only 1.0 and 0.0 from training data pos_idx = np.where(np.logical_or(hits_tr == 1.0, trusted_hits_tr >= 1.0))[0] neg_idx = np.where(hits_tr == 0.0)[0] idx = np.concatenate((pos_idx, neg_idx)) # choose selected sample to train on domain_tr = domain_tr[idx] flow_tr = flow_tr[idx] client_tr = np.zeros_like(idx, float) client_tr[:pos_idx.shape[-1]] = 1.0 server_tr = server_tr[idx] client_tr = np_utils.to_categorical(client_tr, 2) return domain_tr, flow_tr, client_tr, server_tr def store_h5dataset(path, domain_tr, flow_tr, client_tr, server_tr): f = h5py.File(path, "w") domain_tr = domain_tr.astype(np.int8) f.create_dataset("domain", data=domain_tr) f.create_dataset("flow", data=flow_tr) server_tr = server_tr.astype(np.bool) client_tr = client_tr.astype(np.bool) f.create_dataset("client", data=client_tr) f.create_dataset("server", data=server_tr) f.close() def load_h5dataset(path): data = h5py.File(path, "r") return data["domain"], data["flow"], data["client"], data["server"] def create_dataset_from_lists(chunks, vocab, max_len): """ combines domain and feature windows to sequential training data :param chunks: list of flow feature windows :param vocab: :param max_len: :return: """ def get_domain_features_reduced(d): return get_domain_features(d[0], vocab, max_len) logger.info(" compute domain features") domain_features = [] for ds in tqdm(map(lambda f: f.domain, chunks)): domain_features.append(np.apply_along_axis(get_domain_features_reduced, 2, np.atleast_3d(ds))) domain_features = np.concatenate(domain_features, 0) logger.info(" compute flow features") flow_features = get_all_flow_features(chunks) logger.info(" select hits") hits = np.max(np.stack(map(lambda f: f.virusTotalHits, chunks)), axis=1) logger.info(" select names") names = np.unique(np.stack(map(lambda f: f.user_hash, chunks))) logger.info(" select servers") servers = np.stack(map(lambda f: f.serverLabel, chunks)) logger.info(" select trusted hits") trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, chunks)), axis=1) return (domain_features, flow_features, hits, names, servers, trusted_hits) def discretize_label(values, threshold): max_val = np.max(values) if max_val >= threshold: return 1.0 elif max_val == -1: return -1.0 elif 0 < max_val < threshold: return -2.0 else: return 0.0 def get_user_flow_data(csv_file): types = { "duration": int, "bytes_down": int, "bytes_up": int, "domain": object, "timeStamp": float, "server_ip": object, "user_hash": float, "virusTotalHits": int, "serverLabel": int, "trustedHits": int } df = pd.read_csv(csv_file) df = df[list(types.keys())] df.set_index(keys=['user_hash'], drop=False, inplace=True) return df def get_flow_per_user(df): users = df['user_hash'].unique().tolist() for user in users: yield df.loc[df.user_hash == user].dropna(axis=0, how="any") def load_or_generate_h5data(h5data, train_data, domain_length, window_size): char_dict = get_character_dict() logger.info(f"check for h5data {h5data}") try: open(h5data, "r") except FileNotFoundError: logger.info("h5 data not found - load csv file") user_flow_df = get_user_flow_data(train_data) logger.info("create training dataset") domain_tr, flow_tr, client_tr, server_tr = create_dataset_from_flows(user_flow_df, char_dict, max_len=domain_length, window_size=window_size) logger.info("store training dataset as h5 file") store_h5dataset(h5data, domain_tr, flow_tr, client_tr, server_tr) logger.info("load h5 dataset") return load_h5dataset(h5data) # TODO: implement csv loading if already generated def load_or_generate_domains(train_data, domain_length): char_dict = get_character_dict() user_flow_df = get_user_flow_data(train_data) domain_encs = user_flow_df.domain.apply(lambda d: get_domain_features(d, char_dict, domain_length)) domain_encs = np.stack(domain_encs) user_flow_df = user_flow_df[["domain", "serverLabel", "trustedHits", "virusTotalHits"]].dropna(axis=0, how="any") user_flow_df.reset_index(inplace=True) user_flow_df["clientLabel"] = np.where( np.logical_or(user_flow_df.trustedHits > 0, user_flow_df.virusTotalHits >= 3), 1.0, 0.0) user_flow_df = user_flow_df[["domain", "serverLabel", "clientLabel"]] user_flow_df.groupby(user_flow_df.domain).mean() return domain_encs, user_flow_df[["serverLabel", "clientLabel"]].as_matrix()