# -*- coding: utf-8 -*- import logging import string from multiprocessing import Pool import h5py import joblib import numpy as np import pandas as pd from tqdm import tqdm logger = logging.getLogger('logger') chars = dict((char, idx + 1) for (idx, char) in enumerate(string.ascii_lowercase + string.punctuation + string.digits)) def get_character_dict(): return chars def get_vocab_size(): return len(chars) + 1 def encode_char(c): if c in chars: return chars[c] else: return 0 encode_char = np.vectorize(encode_char) # TODO: ask for correct refactoring def get_user_chunks(user_flow, window=10): # TODO: what is maxLengthInSeconds for?!? # maxMilliSeconds = maxLengthInSeconds * 1000 # domains = [] # flows = [] # if not overlapping: # numBlocks = int(np.ceil(len(user_flow) / window)) # userIDs = np.arange(len(user_flow)) # for blockID in np.arange(numBlocks): # curIDs = userIDs[(blockID * window):((blockID + 1) * window)] # useData = user_flow.iloc[curIDs] # curDomains = useData['domain'] # if maxLengthInSeconds != -1: # curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds # underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) # if len(underTimeOutIDs) != len(curIDs): # curIDs = curIDs[underTimeOutIDs] # useData = user_flow.iloc[curIDs] # curDomains = useData['domain'] # domains.append(list(curDomains)) # flows.append(useData) # else: # numBlocks = len(user_flow) + 1 - window # userIDs = np.arange(len(user_flow)) # for blockID in np.arange(numBlocks): # curIDs = userIDs[blockID:blockID + window] # useData = user_flow.iloc[curIDs] # curDomains = useData['domain'] # if maxLengthInSeconds != -1: # curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds # underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) # if len(underTimeOutIDs) != len(curIDs): # curIDs = curIDs[underTimeOutIDs] # useData = user_flow.iloc[curIDs] # curDomains = useData['domain'] # domains.append(list(curDomains)) # flows.append(useData) # if domains and len(domains[-1]) != window: # domains.pop(-1) # flows.pop(-1) # return domains, flows result = [] chunk_size = (len(user_flow) // window) for i in range(chunk_size): result.append(user_flow.iloc[i * window:(i + 1) * window]) if result and len(result[-1]) != window: result.pop() return result def get_domain_features(domain, vocab: dict, max_length=40): encoding = np.zeros((max_length,)) for j in range(min(len(domain), max_length)): char = domain[-j] # TODO: why -j -> order reversed for domain url? encoding[j] = vocab.get(char, 0) return encoding def get_all_flow_features(features): flows = np.stack( map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features) ) return np.log1p(flows) def create_dataset_from_flows(user_flow_df, char_dict, max_len, window_size=10): domain, flow, name, hits, trusted_hits, server = create_raw_dataset_from_flows(user_flow_df, char_dict, max_len, window_size) domain, flow, name, client, server = filter_window_dataset_by_hits(domain, flow, name, hits, trusted_hits, server) return domain, flow, name, client, server def filter_window_dataset_by_hits(domain, flow, name, hits, trusted_hits, server): # select only 1.0 and 0.0 from training data pos_idx = np.where(np.logical_or(hits == 1.0, trusted_hits >= 1.0))[0] neg_idx = np.where(hits == 0.0)[0] idx = np.concatenate((pos_idx, neg_idx)) # choose selected sample to train on domain = domain[idx] flow = flow[idx] client = np.zeros_like(idx, float) client[:pos_idx.shape[-1]] = 1.0 server = server[idx] name = name[idx] return domain, flow, name, client, server def create_raw_dataset_from_flows(user_flow_df, char_dict, max_len, window_size=10): logger.info("get chunks from user data frames") with Pool() as pool: results = [] for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())): results.append(pool.apply_async(get_user_chunks, (user_flow, window_size))) windows = [window for res in results for window in res.get()] logger.info("create training dataset") domain, flow, hits, name, server, trusted_hits = create_dataset_from_windows(chunks=windows, vocab=char_dict, max_len=max_len) # make client labels discrete with 4 different values hits = np.apply_along_axis(lambda x: make_label_discrete(x, 3), 0, np.atleast_2d(hits)) return domain, flow, name, hits, trusted_hits, server def store_h5dataset(path, data: dict): f = h5py.File(path + ".h5", "w") for key, val in data.items(): f.create_dataset(key, data=val) f.close() def check_h5dataset(path): return open(path + ".h5", "r") def load_h5dataset(path): f = h5py.File(path + ".h5", "r") data = {} for k in f.keys(): data[k] = f[k] return data def create_dataset_from_windows(chunks, vocab, max_len): """ combines domain and feature windows to sequential training data :param chunks: list of flow feature windows :param vocab: :param max_len: :return: """ def get_domain_features_reduced(d): return get_domain_features(d[0], vocab, max_len) logger.info(" compute domain features") domain_features = [] for ds in tqdm(map(lambda f: f.domain, chunks)): domain_features.append(np.apply_along_axis(get_domain_features_reduced, 2, np.atleast_3d(ds))) domain_features = np.concatenate(domain_features, 0) logger.info(" compute flow features") flow_features = get_all_flow_features(chunks) logger.info(" select hits") hits = np.max(np.stack(map(lambda f: f.virusTotalHits, chunks)), axis=1) logger.info(" select names") names = np.stack(map(lambda f: f.user_hash, chunks)) assert (names[:, :1].repeat(10, axis=1) == names).all() names = names[:, 0] logger.info(" select servers") servers = np.stack(map(lambda f: f.serverLabel, chunks)) logger.info(" select trusted hits") trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, chunks)), axis=1) return (domain_features, flow_features, hits, names, servers, trusted_hits) def make_label_discrete(values, threshold): max_val = np.max(values) if max_val >= threshold: return 1.0 elif max_val == -1: return -1.0 elif 0 < max_val < threshold: return -2.0 else: return 0.0 def get_user_flow_data(csv_file): types = { "duration": int, "bytes_down": int, "bytes_up": int, "domain": object, "timeStamp": float, "server_ip": object, "user_hash": float, "virusTotalHits": int, "serverLabel": int, "trustedHits": int } df = pd.read_csv(csv_file, index_col=False) df = df[list(types.keys())] # df.set_index(keys=['user_hash'], drop=False, inplace=True) return df def get_flow_per_user(df): users = df['user_hash'].unique().tolist() for user in users: yield df.loc[df.user_hash == user].dropna(axis=0, how="any") def load_or_generate_h5data(h5data, train_data, domain_length, window_size): logger.info(f"check for h5data {h5data}") try: check_h5dataset(h5data) except FileNotFoundError: logger.info("load raw training dataset") domain, flow, name, hits, trusted_hits, server = load_or_generate_raw_h5data(h5data, train_data, domain_length, window_size) logger.info("filter training dataset") domain, flow, name, client, server = filter_window_dataset_by_hits(domain.value, flow.value, name.value, hits.value, trusted_hits.value, server.value) logger.info("store training dataset as h5 file") data = { "domain": domain.astype(np.int8), "flow": flow, "name": name, "client": client.astype(np.bool), "server": server.astype(np.bool) } store_h5dataset(h5data, data) logger.info("load h5 dataset") data = load_h5dataset(h5data) return data["domain"], data["flow"], data["name"], data["client"], data["server"] def load_or_generate_raw_h5data(h5data, train_data, domain_length, window_size): h5data = h5data + "_raw" char_dict = get_character_dict() logger.info(f"check for h5data {h5data}") try: check_h5dataset(h5data) except FileNotFoundError: logger.info("h5 data not found - load csv file") user_flow_df = get_user_flow_data(train_data) logger.info("create raw training dataset") domain, flow, name, hits, trusted_hits, server = create_raw_dataset_from_flows(user_flow_df, char_dict, domain_length, window_size) logger.info("store raw training dataset as h5 file") data = { "domain": domain.astype(np.int8), "flow": flow, "name": name, "hits_vt": hits.astype(np.int8), "hits_trusted": hits.astype(np.int8), "server": server.astype(np.bool) } store_h5dataset(h5data, data) logger.info("load h5 dataset") data = load_h5dataset(h5data) return data["domain"], data["flow"], data["name"], data["hits_vt"], data["hits_trusted"], data["server"] def generate_names(train_data, window_size): user_flow_df = get_user_flow_data(train_data) with Pool() as pool: results = [] for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())): results.append(pool.apply_async(get_user_chunks, (user_flow, window_size))) windows = [window for res in results for window in res.get()] names = np.stack(map(lambda f: f.user_hash, windows)) names = names[:, 0] return names def load_or_generate_domains(train_data, domain_length): fn = f"{train_data}_domains.gz" char_dict = get_character_dict() try: user_flow_df = pd.read_csv(fn) except FileNotFoundError: user_flow_df = get_user_flow_data(train_data) # user_flow_df.reset_index(inplace=True) user_flow_df = user_flow_df[["domain", "serverLabel", "trustedHits", "virusTotalHits"]].dropna(axis=0, how="any") user_flow_df = user_flow_df.groupby(user_flow_df.domain).mean() user_flow_df.reset_index(inplace=True) user_flow_df["clientLabel"] = np.where( np.logical_or(user_flow_df.trustedHits > 0, user_flow_df.virusTotalHits >= 3), True, False) user_flow_df[["serverLabel", "clientLabel"]] = user_flow_df[["serverLabel", "clientLabel"]].astype(bool) user_flow_df = user_flow_df[["domain", "serverLabel", "clientLabel"]] user_flow_df.to_csv(fn, compression="gzip") domain_encs = user_flow_df.domain.apply(lambda d: get_domain_features(d, char_dict, domain_length)) domain_encs = np.stack(domain_encs) return domain_encs, user_flow_df[["serverLabel", "clientLabel"]].as_matrix().astype(bool) def save_predictions(path, results): joblib.dump(results, path + "/results.joblib", compress=3) def load_predictions(path): return joblib.load(path + "/results.joblib")