# -*- coding: utf-8 -*- import logging import string from multiprocessing import Pool import h5py import joblib import numpy as np import pandas as pd from tqdm import tqdm logger = logging.getLogger('cisco_logger') char2idx = dict((char, idx + 1) for (idx, char) in enumerate(string.ascii_lowercase + string.punctuation + string.digits + " ")) idx2char = {v: k for k, v in char2idx.items()} def get_character_dict(): return char2idx def get_vocab_size(): return len(char2idx) + 1 def encode_char(c): return char2idx.get(c, 0) def decode_char(i): return idx2char.get(i, "") encode_char = np.vectorize(encode_char) decode_char = np.vectorize(decode_char) def encode_domain(domain: string): return encode_char(list(domain)) def decode_domain(domain): return "".join(decode_char(domain)) def get_user_chunks(user_flow, window=10): result = [] chunk_size = (len(user_flow) // window) for i in range(chunk_size): result.append(user_flow.iloc[i * window:(i + 1) * window]) if result and len(result[-1]) != window: result.pop() return result def get_domain_features(domain: string, max_length=40): encoding = np.zeros((max_length,)) for j in range(min(len(domain), max_length)): c = domain[len(domain) - 1 - j] encoding[max_length - 1 - j] = encode_char(c) return encoding def get_all_flow_features(features): flows = np.stack( map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features) ) return np.log1p(flows) def filter_window_dataset_by_hits(domain, flow, name, hits, trusted_hits, server): # select only 1.0 and 0.0 from training data pos_idx = np.where(np.logical_or(hits == 1.0, trusted_hits >= 1.0))[0] neg_idx = np.where(hits == 0.0)[0] idx = np.concatenate((pos_idx, neg_idx)) # choose selected sample to train on domain = domain[idx] flow = flow[idx] client = np.zeros_like(idx, float) client[:pos_idx.shape[-1]] = 1.0 server = server[idx] name = name[idx] return domain, flow, name, client, server def create_raw_dataset_from_flows(user_flow_df, max_len, window_size=10): logger.info("get chunks from user data frames") with Pool() as pool: results = [] for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())): results.append(pool.apply_async(get_user_chunks, (user_flow, window_size))) windows = [window for res in results for window in res.get()] logger.info("create training dataset") domain, flow, hits, name, server, trusted_hits = create_dataset_from_windows(chunks=windows, max_len=max_len) # make client labels discrete with 4 different values hits = np.apply_along_axis(lambda x: make_label_discrete(x, 3), 0, np.atleast_2d(hits)) return domain, flow, name, hits, trusted_hits, server def store_h5dataset(path, data: dict): f = h5py.File(path + ".h5", "w") for key, val in data.items(): f.create_dataset(key, data=val) f.close() def check_h5dataset(path): return open(path + ".h5", "r") def load_h5dataset(path): f = h5py.File(path + ".h5", "r") data = {} for k in f.keys(): data[k] = f[k] return data def create_dataset_from_windows(chunks, max_len): """ combines domain and feature windows to sequential training data :param chunks: list of flow feature windows :param vocab: :param max_len: :return: """ def get_domain_features_reduced(d): return get_domain_features(d[0], max_len) logger.info(" compute domain features") domain_features = [] for ds in tqdm(map(lambda f: f.domain, chunks)): domain_features.append(np.apply_along_axis(get_domain_features_reduced, 2, np.atleast_3d(ds))) domain_features = np.concatenate(domain_features, 0) logger.info(" compute flow features") flow_features = get_all_flow_features(chunks) logger.info(" select hits") hits = np.max(np.stack(map(lambda f: f.virusTotalHits, chunks)), axis=1) logger.info(" select names") names = np.stack(map(lambda f: f.user_hash, chunks)) assert (names[:, :1].repeat(10, axis=1) == names).all() names = names[:, 0] logger.info(" select servers") servers = np.stack(map(lambda f: f.serverLabel, chunks)) logger.info(" select trusted hits") trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, chunks)), axis=1) return (domain_features, flow_features, hits, names, servers, trusted_hits) def make_label_discrete(values, threshold): max_val = np.max(values) if max_val >= threshold: return 1.0 elif max_val == -1: return -1.0 elif 0 < max_val < threshold: return -2.0 else: return 0.0 def get_user_flow_data(csv_file): types = { "duration": int, "bytes_down": int, "bytes_up": int, "domain": object, "timeStamp": float, "http_method": object, "server_ip": object, "user_hash": float, "virusTotalHits": int, "serverLabel": int, "trustedHits": int } df = pd.read_csv(csv_file, index_col=False) df = df[list(types.keys())] # df.set_index(keys=['user_hash'], drop=False, inplace=True) return df def get_flow_per_user(df): users = df['user_hash'].unique().tolist() for user in users: yield df.loc[df.user_hash == user].dropna(axis=0, how="any") def load_or_generate_h5data(h5data, train_data, domain_length, window_size): logger.info(f"check for h5data {h5data}") try: check_h5dataset(h5data) except FileNotFoundError: logger.info("load raw training dataset") domain, flow, name, hits, trusted_hits, server = load_or_generate_raw_h5data(h5data, train_data, domain_length, window_size) logger.info("filter training dataset") domain, flow, name, client, server = filter_window_dataset_by_hits(domain.value, flow.value, name.value, hits.value, trusted_hits.value, server.value) logger.info("store training dataset as h5 file") data = { "domain": domain.astype(np.int8), "flow": flow, "name": name, "client": client.astype(np.bool), "server": server.astype(np.bool) } store_h5dataset(h5data, data) logger.info("load h5 dataset") data = load_h5dataset(h5data) return data["domain"], data["flow"], data["name"], data["client"], data["server"] def load_or_generate_raw_h5data(h5data, train_data, domain_length, window_size): h5data = h5data + "_raw" logger.info(f"check for h5data {h5data}") try: check_h5dataset(h5data) except FileNotFoundError: logger.info("h5 data not found - load csv file") user_flow_df = get_user_flow_data(train_data) logger.info("create raw training dataset") domain, flow, name, hits, trusted_hits, server = create_raw_dataset_from_flows(user_flow_df, domain_length, window_size) logger.info("store raw training dataset as h5 file") data = { "domain": domain.astype(np.int8), "flow": flow, "name": name, "hits_vt": hits.astype(np.int8), "hits_trusted": hits.astype(np.int8), "server": server.astype(np.bool) } store_h5dataset(h5data, data) logger.info("load h5 dataset") data = load_h5dataset(h5data) return data["domain"], data["flow"], data["name"], data["hits_vt"], data["hits_trusted"], data["server"] def generate_names(train_data, window_size): user_flow_df = get_user_flow_data(train_data) with Pool() as pool: results = [] for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())): results.append(pool.apply_async(get_user_chunks, (user_flow, window_size))) windows = [window for res in results for window in res.get()] names = np.stack(map(lambda f: f.user_hash, windows)) names = names[:, 0] return names def load_or_generate_domains(train_data, domain_length): fn = f"{train_data}_domains.gz" try: user_flow_df = pd.read_csv(fn) except FileNotFoundError: user_flow_df = get_user_flow_data(train_data) # user_flow_df.reset_index(inplace=True) user_flow_df = user_flow_df[["domain", "serverLabel", "trustedHits", "virusTotalHits"]].dropna(axis=0, how="any") user_flow_df = user_flow_df.groupby(user_flow_df.domain).mean() user_flow_df.reset_index(inplace=True) user_flow_df["clientLabel"] = np.where( np.logical_or(user_flow_df.trustedHits > 0, user_flow_df.virusTotalHits >= 3), True, False) user_flow_df[["serverLabel", "clientLabel"]] = user_flow_df[["serverLabel", "clientLabel"]].astype(bool) user_flow_df = user_flow_df[["domain", "serverLabel", "clientLabel"]] user_flow_df.to_csv(fn, compression="gzip") domain_encs = user_flow_df.domain.apply(lambda d: get_domain_features(d, domain_length)) domain_encs = np.stack(domain_encs) return domain_encs, user_flow_df.domain, user_flow_df[["clientLabel", "serverLabel"]].as_matrix().astype(bool) def save_predictions(path, results): joblib.dump(results, path + "/results.joblib", compress=3) def load_predictions(path): return joblib.load(path + "/results.joblib")