add multi-threading for pre-processing

This commit is contained in:
René Knaebel 2017-07-16 09:42:52 +02:00
parent 336be37032
commit 844494eca9
2 changed files with 100 additions and 76 deletions

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import logging import logging
import string import string
from multiprocessing import Pool
import h5py import h5py
import numpy as np import numpy as np
@ -29,48 +30,50 @@ encode_char = np.vectorize(encode_char)
# TODO: refactor # TODO: refactor
def get_user_chunks(dataFrame, windowSize=10, overlapping=False, def get_user_chunks(user_flow, window=10):
maxLengthInSeconds=300): # TODO: what is maxLengthInSeconds for?!?
maxMilliSeconds = maxLengthInSeconds * 1000 # maxMilliSeconds = maxLengthInSeconds * 1000
outDomainLists = [] # domains = []
outDFFrames = [] # flows = []
if not overlapping: # if not overlapping:
numBlocks = int(np.ceil(float(len(dataFrame)) / float(windowSize))) # numBlocks = int(np.ceil(len(user_flow) / window))
userIDs = np.arange(len(dataFrame)) # userIDs = np.arange(len(user_flow))
for blockID in np.arange(numBlocks): # for blockID in np.arange(numBlocks):
curIDs = userIDs[(blockID * windowSize):((blockID + 1) * windowSize)] # curIDs = userIDs[(blockID * window):((blockID + 1) * window)]
# logger.info(curIDs) # useData = user_flow.iloc[curIDs]
useData = dataFrame.iloc[curIDs] # curDomains = useData['domain']
curDomains = useData['domain'] # if maxLengthInSeconds != -1:
if maxLengthInSeconds != -1: # curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds
curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds # underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds)
underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) # if len(underTimeOutIDs) != len(curIDs):
if len(underTimeOutIDs) != len(curIDs): # curIDs = curIDs[underTimeOutIDs]
curIDs = curIDs[underTimeOutIDs] # useData = user_flow.iloc[curIDs]
useData = dataFrame.iloc[curIDs] # curDomains = useData['domain']
curDomains = useData['domain'] # domains.append(list(curDomains))
outDomainLists.append(list(curDomains)) # flows.append(useData)
outDFFrames.append(useData) # else:
else: # numBlocks = len(user_flow) + 1 - window
numBlocks = len(dataFrame) + 1 - windowSize # userIDs = np.arange(len(user_flow))
userIDs = np.arange(len(dataFrame)) # for blockID in np.arange(numBlocks):
for blockID in np.arange(numBlocks): # curIDs = userIDs[blockID:blockID + window]
curIDs = userIDs[blockID:blockID + windowSize] # useData = user_flow.iloc[curIDs]
useData = dataFrame.iloc[curIDs] # curDomains = useData['domain']
curDomains = useData['domain'] # if maxLengthInSeconds != -1:
if maxLengthInSeconds != -1: # curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds
curMinMilliSeconds = np.min(useData['timeStamp']) + maxMilliSeconds # underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds)
underTimeOutIDs = np.where(np.array(useData['timeStamp']) <= curMinMilliSeconds) # if len(underTimeOutIDs) != len(curIDs):
if len(underTimeOutIDs) != len(curIDs): # curIDs = curIDs[underTimeOutIDs]
curIDs = curIDs[underTimeOutIDs] # useData = user_flow.iloc[curIDs]
useData = dataFrame.iloc[curIDs] # curDomains = useData['domain']
curDomains = useData['domain'] # domains.append(list(curDomains))
outDomainLists.append(list(curDomains)) # flows.append(useData)
outDFFrames.append(useData) # if domains and len(domains[-1]) != window:
if len(outDomainLists[-1]) != windowSize: # domains.pop(-1)
outDomainLists.pop(-1) # flows.pop(-1)
outDFFrames.pop(-1) # return domains, flows
return outDomainLists, outDFFrames chunk_size = (len(user_flow) // window)
last_inrange = chunk_size * window
return np.split(user_flow.head(last_inrange), chunk_size) if chunk_size else []
def get_domain_features(domain, vocab: dict, max_length=40): def get_domain_features(domain, vocab: dict, max_length=40):
@ -82,31 +85,23 @@ def get_domain_features(domain, vocab: dict, max_length=40):
def get_all_flow_features(features): def get_all_flow_features(features):
flows = np.stack(list( flows = np.stack(
map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features)) map(lambda f: f[["duration", "bytes_up", "bytes_down"]], features)
) )
return np.log1p(flows) return np.log1p(flows)
def create_dataset_from_flows(user_flow_df, char_dict, max_len, window_size=10): def create_dataset_from_flows(user_flow_df, char_dict, max_len, window_size=10):
domains = []
features = []
logger.info("get chunks from user data frames") logger.info("get chunks from user data frames")
for i, user_flow in tqdm(list(enumerate(get_flow_per_user(user_flow_df)))): with Pool() as pool:
(domain_windows, feature_windows) = get_user_chunks(user_flow, results = []
windowSize=window_size, for user_flow in tqdm(get_flow_per_user(user_flow_df), total=len(user_flow_df['user_hash'].unique().tolist())):
overlapping=False, results.append(pool.apply_async(get_user_chunks, (user_flow, window_size)))
maxLengthInSeconds=-1) windows = [window for res in results for window in res.get()]
domains += domain_windows
features += feature_windows
logger.info("create training dataset") logger.info("create training dataset")
domain_tr, flow_tr, hits_tr, _, server_tr, trusted_hits_tr = create_dataset_from_lists(domains=domains, domain_tr, flow_tr, hits_tr, _, server_tr, trusted_hits_tr = create_dataset_from_lists(chunks=windows,
flows=features,
vocab=char_dict, vocab=char_dict,
max_len=max_len, max_len=max_len)
window_size=window_size)
# make client labels discrete with 4 different values # make client labels discrete with 4 different values
hits_tr = np.apply_along_axis(lambda x: discretize_label(x, 3), 0, np.atleast_2d(hits_tr)) hits_tr = np.apply_along_axis(lambda x: discretize_label(x, 3), 0, np.atleast_2d(hits_tr))
# select only 1.0 and 0.0 from training data # select only 1.0 and 0.0 from training data
@ -143,34 +138,46 @@ def load_h5dataset(path):
return data["domain"], data["flow"], data["client"], data["server"] return data["domain"], data["flow"], data["client"], data["server"]
def create_dataset_from_lists(domains, flows, vocab, max_len, window_size=10): def create_dataset_from_lists(chunks, vocab, max_len):
""" """
combines domain and feature windows to sequential training data combines domain and feature windows to sequential training data
:param domains: list of domain windows :param chunks: list of flow feature windows
:param flows: list of flow feature windows
:param vocab: :param vocab:
:param max_len: :param max_len:
:param window_size: size of the flow window
:return: :return:
""" """
domain_features = np.array([[get_domain_features(d, vocab, max_len) for d in x] for x in domains])
flow_features = get_all_flow_features(flows) def get_domain_features_reduced(d):
hits = np.max(np.stack(map(lambda f: f.virusTotalHits, flows)), axis=1) return get_domain_features(d[0], vocab, max_len)
names = np.unique(np.stack(map(lambda f: f.user_hash, flows)), axis=1)
servers = np.max(np.stack(map(lambda f: f.serverLabel, flows)), axis=1) logger.info(" compute domain features")
trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, flows)), axis=1) domain_features = []
for ds in tqdm(map(lambda f: f.domain, chunks)):
assert min(np.atleast_3d(ds).shape) > 0, f"shape of 0 for {ds}"
domain_features.append(np.apply_along_axis(get_domain_features_reduced, 2, np.atleast_3d(ds)))
domain_features = np.concatenate(domain_features, 0)
logger.info(" compute flow features")
flow_features = get_all_flow_features(chunks)
logger.info(" select hits")
hits = np.max(np.stack(map(lambda f: f.virusTotalHits, chunks)), axis=1)
logger.info(" select names")
names = np.unique(np.stack(map(lambda f: f.user_hash, chunks)), axis=1)
logger.info(" select servers")
servers = np.max(np.stack(map(lambda f: f.serverLabel, chunks)), axis=1)
logger.info(" select trusted hits")
trusted_hits = np.max(np.stack(map(lambda f: f.trustedHits, chunks)), axis=1)
return (domain_features, flow_features, return (domain_features, flow_features,
hits, names, servers, trusted_hits) hits, names, servers, trusted_hits)
def discretize_label(values, threshold): def discretize_label(values, threshold):
maxVal = np.max(values) max_val = np.max(values)
if maxVal >= threshold: if max_val >= threshold:
return 1.0 return 1.0
elif maxVal == -1: elif max_val == -1:
return -1.0 return -1.0
elif 0 < maxVal < threshold: elif 0 < max_val < threshold:
return -2.0 return -2.0
else: else:
return 0.0 return 0.0
@ -198,7 +205,7 @@ def get_user_flow_data(csv_file):
def get_flow_per_user(df): def get_flow_per_user(df):
users = df['user_hash'].unique().tolist() users = df['user_hash'].unique().tolist()
for user in users: for user in users:
yield df.loc[df.user_hash == user] yield df.loc[df.user_hash == user].dropna(axis=0, how="any")
def load_or_generate_h5data(h5data, train_data, domain_length, window_size): def load_or_generate_h5data(h5data, train_data, domain_length, window_size):
@ -206,6 +213,7 @@ def load_or_generate_h5data(h5data, train_data, domain_length, window_size):
logger.info(f"check for h5data {h5data}") logger.info(f"check for h5data {h5data}")
try: try:
open(h5data, "r") open(h5data, "r")
raise FileNotFoundError
except FileNotFoundError: except FileNotFoundError:
logger.info("h5 data not found - load csv file") logger.info("h5 data not found - load csv file")
user_flow_df = get_user_flow_data(train_data) user_flow_df = get_user_flow_data(train_data)

20
main.py
View File

@ -173,7 +173,6 @@ def main_train(param=None):
else: else:
logger.info("class weights: set default") logger.info("class weights: set default")
custom_class_weights = None custom_class_weights = None
logger.info("start training") logger.info("start training")
model.fit([domain_tr, flow_tr], model.fit([domain_tr, flow_tr],
[client_tr, server_tr], [client_tr, server_tr],
@ -195,7 +194,8 @@ def main_test():
# [client_val, server_val], # [client_val, server_val],
# batch_size=args.batch_size) # batch_size=args.batch_size)
y_pred = clf.predict([domain_val, flow_val], y_pred = clf.predict([domain_val, flow_val],
batch_size=args.batch_size) batch_size=args.batch_size,
verbose=1)
np.save(args.future_prediction, y_pred) np.save(args.future_prediction, y_pred)
@ -247,6 +247,7 @@ def main_visualization():
plt.scatter(domain_reduced[:, 0], domain_reduced[:, 1], c=servers, cmap=plt.cm.bwr, s=2) plt.scatter(domain_reduced[:, 0], domain_reduced[:, 1], c=servers, cmap=plt.cm.bwr, s=2)
plt.show() plt.show()
def main_score(): def main_score():
# mask = dataset.load_mask_eval(args.data, args.test_image) # mask = dataset.load_mask_eval(args.data, args.test_image)
# pred = np.load(args.pred) # pred = np.load(args.pred)
@ -254,6 +255,19 @@ def main_score():
pass pass
def main_data():
char_dict = dataset.get_character_dict()
user_flow_df = dataset.get_user_flow_data(args.train_data)
logger.info("create training dataset")
domain_tr, flow_tr, client_tr, server_tr = dataset.create_dataset_from_flows(user_flow_df, char_dict,
max_len=args.domain_length,
window_size=args.window)
print(f"domain shape {domain_tr.shape}")
print(f"flow shape {flow_tr.shape}")
print(f"client shape {client_tr.shape}")
print(f"server shape {server_tr.shape}")
def main(): def main():
if "train" in args.modes: if "train" in args.modes:
main_train() main_train()
@ -267,6 +281,8 @@ def main():
main_score() main_score()
if "paul" in args.modes: if "paul" in args.modes:
main_paul_best() main_paul_best()
if "data" in args.modes:
main_data()
if __name__ == "__main__": if __name__ == "__main__":