From 844494eca9312a7d7e7aa578ab26f422c07dc790 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Knaebel?= Date: Sun, 16 Jul 2017 09:42:52 +0200 Subject: [PATCH] add multi-threading for pre-processing --- dataset.py | 156 ++++++++++++++++++++++++++++------------------------- main.py | 20 ++++++- 2 files changed, 100 insertions(+), 76 deletions(-) diff --git a/dataset.py b/dataset.py index f00a360..732da4b 100644 --- a/dataset.py +++ b/dataset.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import logging import string +from multiprocessing import Pool import h5py import numpy as np @@ -29,48 +30,50 @@ encode_char = np.vectorize(encode_char) # TODO: refactor -def get_user_chunks(dataFrame, windowSize=10, overlapping=False, - maxLengthInSeconds=300): - maxMilliSeconds = maxLengthInSeconds * 1000 - outDomainLists = [] - outDFFrames = [] - if not overlapping: - numBlocks = int(np.ceil(float(len(dataFrame)) / float(windowSize))) - userIDs = np.arange(len(dataFrame)) - for blockID in np.arange(numBlocks): - curIDs = userIDs[(blockID * windowSize):((blockID + 1) * windowSize)] - # logger.info(curIDs) - useData = dataFrame.iloc[curIDs] - curDomains = useData['domain'] - if maxLengthInSeconds != -1: - curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds - underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) - if len(underTimeOutIDs) != len(curIDs): - curIDs = curIDs[underTimeOutIDs] - useData = dataFrame.iloc[curIDs] - curDomains = useData['domain'] - outDomainLists.append(list(curDomains)) - outDFFrames.append(useData) - else: - numBlocks = len(dataFrame) + 1 - windowSize - userIDs = np.arange(len(dataFrame)) - for blockID in np.arange(numBlocks): - curIDs = userIDs[blockID:blockID + windowSize] - useData = dataFrame.iloc[curIDs] - curDomains = useData['domain'] - if maxLengthInSeconds != -1: - curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds - underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) - if len(underTimeOutIDs) != len(curIDs): - curIDs = curIDs[underTimeOutIDs] - useData = dataFrame.iloc[curIDs] - curDomains = useData['domain'] - outDomainLists.append(list(curDomains)) - outDFFrames.append(useData) - if len(outDomainLists[-1]) != windowSize: - outDomainLists.pop(-1) - outDFFrames.pop(-1) - return outDomainLists, outDFFrames +def get_user_chunks(user_flow, window=10): + # TODO: what is maxLengthInSeconds for?!? + # maxMilliSeconds = maxLengthInSeconds * 1000 + # domains = [] + # flows = [] + # if not overlapping: + # numBlocks = int(np.ceil(len(user_flow) / window)) + # userIDs = np.arange(len(user_flow)) + # for blockID in np.arange(numBlocks): + # curIDs = userIDs[(blockID * window):((blockID + 1) * window)] + # useData = user_flow.iloc[curIDs] + # curDomains = useData['domain'] + # if maxLengthInSeconds != -1: + # curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds + # underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) + # if len(underTimeOutIDs) != len(curIDs): + # curIDs = curIDs[underTimeOutIDs] + # useData = user_flow.iloc[curIDs] + # curDomains = useData['domain'] + # domains.append(list(curDomains)) + # flows.append(useData) + # else: + # numBlocks = len(user_flow) + 1 - window + # userIDs = np.arange(len(user_flow)) + # for blockID in np.arange(numBlocks): + # curIDs = userIDs[blockID:blockID + window] + # useData = user_flow.iloc[curIDs] + # curDomains = useData['domain'] + # if maxLengthInSeconds != -1: + # curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds + # underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) + # if len(underTimeOutIDs) != len(curIDs): + # curIDs = curIDs[underTimeOutIDs] + # useData = user_flow.iloc[curIDs] + # curDomains = useData['domain'] + # domains.append(list(curDomains)) + # flows.append(useData) + # if domains and len(domains[-1]) != window: + # domains.pop(-1) + # flows.pop(-1) + # return domains, flows + chunk_size = (len(user_flow) // window) + last_inrange = chunk_size * window + return np.split(user_flow.head(last_inrange), chunk_size) if chunk_size else [] def get_domain_features(domain, vocab: dict, max_length=40): @@ -82,31 +85,23 @@ def get_domain_features(domain, vocab: dict, max_length=40): def get_all_flow_features(features): - flows = np.stack(list( - map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features)) + flows = np.stack( + map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features) ) return np.log1p(flows) def create_dataset_from_flows(user_flow_df, char_dict, max_len, window_size=10): - domains = [] - features = [] logger.info("get chunks from user data frames") - for i, user_flow in tqdm(list(enumerate(get_flow_per_user(user_flow_df)))): - (domain_windows, feature_windows) = get_user_chunks(user_flow, - windowSize=window_size, - overlapping=False, - maxLengthInSeconds=-1) - domains += domain_windows - features += feature_windows - + with Pool() as pool: + results = [] + for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())): + results.append(pool.apply_async(get_user_chunks, (user_flow, window_size))) + windows = [window for res in results for window in res.get()] logger.info("create training dataset") - domain_tr, flow_tr, hits_tr, _, server_tr, trusted_hits_tr = create_dataset_from_lists(domains=domains, - flows=features, + domain_tr, flow_tr, hits_tr, _, server_tr, trusted_hits_tr = create_dataset_from_lists(chunks=windows, vocab=char_dict, - max_len=max_len, - window_size=window_size) - + max_len=max_len) # make client labels discrete with 4 different values hits_tr = np.apply_along_axis(lambda x: discretize_label(x, 3), 0, np.atleast_2d(hits_tr)) # select only 1.0 and 0.0 from training data @@ -143,34 +138,46 @@ def load_h5dataset(path): return data["domain"], data["flow"], data["client"], data["server"] -def create_dataset_from_lists(domains, flows, vocab, max_len, window_size=10): +def create_dataset_from_lists(chunks, vocab, max_len): """ combines domain and feature windows to sequential training data - :param domains: list of domain windows - :param flows: list of flow feature windows + :param chunks: list of flow feature windows :param vocab: :param max_len: - :param window_size: size of the flow window :return: """ - domain_features = np.array([[get_domain_features(d, vocab, max_len) for d in x] for x in domains]) - flow_features = get_all_flow_features(flows) - hits = np.max(np.stack(map(lambda f: f.virusTotalHits, flows)), axis=1) - names = np.unique(np.stack(map(lambda f: f.user_hash, flows)), axis=1) - servers = np.max(np.stack(map(lambda f: f.serverLabel, flows)), axis=1) - trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, flows)), axis=1) + + def get_domain_features_reduced(d): + return get_domain_features(d[0], vocab, max_len) + + logger.info(" compute domain features") + domain_features = [] + for ds in tqdm(map(lambda f: f.domain, chunks)): + assert min(np.atleast_3d(ds).shape) > 0, f"shape of 0 for {ds}" + domain_features.append(np.apply_along_axis(get_domain_features_reduced, 2, np.atleast_3d(ds))) + domain_features = np.concatenate(domain_features, 0) + logger.info(" compute flow features") + flow_features = get_all_flow_features(chunks) + logger.info(" select hits") + hits = np.max(np.stack(map(lambda f: f.virusTotalHits, chunks)), axis=1) + logger.info(" select names") + names = np.unique(np.stack(map(lambda f: f.user_hash, chunks)), axis=1) + logger.info(" select servers") + servers = np.max(np.stack(map(lambda f: f.serverLabel, chunks)), axis=1) + logger.info(" select trusted hits") + trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, chunks)), axis=1) return (domain_features, flow_features, hits, names, servers, trusted_hits) def discretize_label(values, threshold): - maxVal = np.max(values) - if maxVal >= threshold: + max_val = np.max(values) + if max_val >= threshold: return 1.0 - elif maxVal == -1: + elif max_val == -1: return -1.0 - elif 0 < maxVal < threshold: + elif 0 < max_val < threshold: return -2.0 else: return 0.0 @@ -198,7 +205,7 @@ def get_user_flow_data(csv_file): def get_flow_per_user(df): users = df['user_hash'].unique().tolist() for user in users: - yield df.loc[df.user_hash == user] + yield df.loc[df.user_hash == user].dropna(axis=0, how="any") def load_or_generate_h5data(h5data, train_data, domain_length, window_size): @@ -206,6 +213,7 @@ def load_or_generate_h5data(h5data, train_data, domain_length, window_size): logger.info(f"check for h5data {h5data}") try: open(h5data, "r") + raise FileNotFoundError except FileNotFoundError: logger.info("h5 data not found - load csv file") user_flow_df = get_user_flow_data(train_data) diff --git a/main.py b/main.py index adbfa23..eb938b4 100644 --- a/main.py +++ b/main.py @@ -173,7 +173,6 @@ def main_train(param=None): else: logger.info("class weights: set default") custom_class_weights = None - logger.info("start training") model.fit([domain_tr, flow_tr], [client_tr, server_tr], @@ -195,7 +194,8 @@ def main_test(): # [client_val, server_val], # batch_size=args.batch_size) y_pred = clf.predict([domain_val, flow_val], - batch_size=args.batch_size) + batch_size=args.batch_size, + verbose=1) np.save(args.future_prediction, y_pred) @@ -247,6 +247,7 @@ def main_visualization(): plt.scatter(domain_reduced[:, 0], domain_reduced[:, 1], c=servers, cmap=plt.cm.bwr, s=2) plt.show() + def main_score(): # mask = dataset.load_mask_eval(args.data, args.test_image) # pred = np.load(args.pred) @@ -254,6 +255,19 @@ def main_score(): pass +def main_data(): + char_dict = dataset.get_character_dict() + user_flow_df = dataset.get_user_flow_data(args.train_data) + logger.info("create training dataset") + domain_tr, flow_tr, client_tr, server_tr = dataset.create_dataset_from_flows(user_flow_df, char_dict, + max_len=args.domain_length, + window_size=args.window) + print(f"domain shape {domain_tr.shape}") + print(f"flow shape {flow_tr.shape}") + print(f"client shape {client_tr.shape}") + print(f"server shape {server_tr.shape}") + + def main(): if "train" in args.modes: main_train() @@ -267,6 +281,8 @@ def main(): main_score() if "paul" in args.modes: main_paul_best() + if "data" in args.modes: + main_data() if __name__ == "__main__":