Source code for metrics

"""

"""

import glob
import json
import math
from datetime import timedelta
from pathlib import Path

import numpy as np

from log import load_app_data, load_worker_data
from util.utils import timing

PLOT_SID_STAGE = 0


[docs]def load_config(folder): """ :param folder: :return: """ config_file = Path(folder + "config.json") print(config_file) if config_file.exists(): config = json.load(open(folder + "config.json")) if len(config) == 0: from config import CONFIG_DICT return CONFIG_DICT else: return config else: from config import CONFIG_DICT return CONFIG_DICT
[docs]def compute_cpu_time(app_id, app_info, workers_dict, config, folder): """ :param app_id: :param app_info: :param workers_dict: :param config: :param folder: :return: """ cpu_time = 0 cpu_time_max = 0 cpus = 0.0 for worker_log in workers_dict: worker_dict = workers_dict[worker_log] try: for sid in worker_dict[app_id]: cpus += sum(worker_dict[app_id][sid]["cpu"]) cpu_time += (config["Control"]["TSample"] / 1000) * sum( worker_dict[app_id][sid]["cpu"]) time_cpu = worker_dict["time_cpu"] for cpu, time in zip(worker_dict[app_id][sid]["cpu"], worker_dict[app_id][sid]["time"]): try: index = time_cpu.index(time) except ValueError: index = min(range(len(time_cpu)), key=lambda i: abs(time_cpu[i] - time)) # print(index) cpu_time_max += (config["Control"]["TSample"] / 1000) * max(cpu, worker_dict[ "cpu_real"][index + int(config["Control"]["TSample"] / 1000)]) except KeyError: print(app_id + " not found") duration_s = app_info[app_id][max(list(app_info[app_id].keys()))]["end"].timestamp() - \ app_info[app_id][PLOT_SID_STAGE]["start"].timestamp() if cpus == 0.0: speed = config["Control"]["MaxExecutor"] * config["Control"]["CoreVM"] speed_20 = math.ceil(config["Control"]["CoreVM"] * duration_s / 525.8934) * \ config["Control"]["MaxExecutor"] speed_40 = math.ceil(config["Control"]["CoreVM"] * duration_s / 613.5423) * \ config["Control"]["MaxExecutor"] print(duration_s) print("SPEED NATIVE 0%", speed) print("SPEED NATIVE 20% ", speed_20) print("SPEED NATIVE 40% ", speed_40) else: speed = (float(cpus) * (config["Control"]["TSample"] / 1000)) / duration_s num_task = 0.0 for sid in app_info[app_id]: num_task += len(app_info[app_id][sid]["tasktimestamps"]) throughput = float(num_task) / duration_s if cpu_time == 0: cpu_time = ((app_info[app_id][max(list(app_info[app_id].keys()))]["end"].timestamp() - app_info[app_id][PLOT_SID_STAGE]["start"].timestamp())) * \ config["Control"]["MaxExecutor"] * config["Control"]["CoreVM"] cpu_time_max = cpu_time cpu_time_max = math.floor(cpu_time_max) print("CPU_TIME: " + str(cpu_time)) print("CPU TIME MAX: " + str(cpu_time_max)) print("SID " + str(app_info[app_id].keys())) print("CHECK NON CONTROLLED STAGE FOR CPU_TIME") with open(folder + "CPU_TIME.txt", "w") as cpu_time_f: cpu_time_f.write("CPU_TIME " + str(cpu_time) + "\n") cpu_time_f.write("CPU_TIME_MAX " + str(cpu_time_max) + "\n") cpu_time_f.write("SPEED " + str(speed) + "\n") cpu_time_f.write("THROUGHPUT " + str(throughput) + "\n")
[docs]def save_deadline_errors(folder, deadline_error, stage_errors): """Save the error of the application's stages in the folder with some statistics (mean, stddev, median, max, min) :param folder: the output folder :param deadline_error: the application's deadline error :param stage_errors: the list of the stages errors :return: Nothing """ with open(folder + "ERROR.txt", "w") as error_f: error_f.write("DEADLINE_ERROR " + str(abs(deadline_error)) + "\n") if len(stage_errors) > 0: error_f.write("MEAN_ERROR " + str(np.mean(stage_errors)) + "\n") error_f.write("DEVSTD_ERROR: " + str(np.std(stage_errors)) + "\n") error_f.write("MEDIAN_ERROR: " + str(np.median(stage_errors)) + "\n") error_f.write("MAX_ERROR: " + str(max(stage_errors)) + "\n") error_f.write("MIN_ERROR: " + str(min(stage_errors)) + "\n")
def compute_errors(app_id, app_dict, folder, config): if len(app_dict) > 0: timestamps = [] times = [] app_deadline = 0 first_ts = app_dict[PLOT_SID_STAGE]["start"].timestamp() for sid in sorted(app_dict): try: app_deadline = app_dict[PLOT_SID_STAGE]["start"] + timedelta( milliseconds=config["Deadline"]) app_deadline = app_deadline.replace(microsecond=0) for timestamp in app_dict[sid]["tasktimestamps"]: if first_ts == 0: timestamps.append(0.0) first_ts = timestamp.timestamp() else: timestamps.append(timestamp.timestamp() - first_ts) if len(times) == 0: times.append(1) else: times.append(times[-1] + 1) except KeyError: None app_alpha_deadline = app_deadline - timedelta( milliseconds=((1 - float(config["Control"]["Alpha"])) * float(config["Deadline"]))) app_alpha_deadline_ts = app_alpha_deadline.timestamp() - first_ts # COMPUTE ERRORS errors = [] sorted_sid = sorted(app_dict) total_duration = app_alpha_deadline.timestamp() - app_dict[PLOT_SID_STAGE][ "start"].timestamp() for sid in sorted_sid: try: start_ts = app_dict[sid]["start"].timestamp() - first_ts end = app_dict[sid]["end"].timestamp() end_ts = end - first_ts int_dead = app_dict[sid]["deadline"].timestamp() dead_ts = int_dead - first_ts if sid == sorted_sid[-1] and start_ts < app_alpha_deadline_ts: dead_ts = app_alpha_deadline_ts deadline_error = round(round(((abs(dead_ts - end_ts)) / total_duration), 4) * 100, 3) print(sid, abs(int_dead - end), total_duration, end, deadline_error) errors.append(deadline_error) except KeyError: None end = app_dict[sorted_sid[-1]]["end"].timestamp() print(abs(app_alpha_deadline.timestamp() - end), total_duration, end) app_deadline_error = round( round(((abs(app_alpha_deadline.timestamp() - end)) / total_duration), 4) * 100, 3) stage_errors = np.array(errors) print("DEADLINE_ERROR " + str(app_deadline_error)) if len(stage_errors) > 0: print("MEAN ERROR: " + str(np.mean(stage_errors))) print("DEVSTD ERROR: " + str(np.std(stage_errors))) print("MEDIAN ERROR: " + str(np.median(stage_errors))) print("MAX ERROR: " + str(max(stage_errors))) print("MIN ERROR: " + str(min(stage_errors))) save_deadline_errors(folder, app_deadline_error, stage_errors) @timing
[docs]def compute_metrics(folder): """ :param folder: :return: """ print(folder) if folder[-1] != "/": folder += "/" config = load_config(folder) print(config) global PLOT_SID_STAGE PLOT_SID_STAGE = 1 if config["HDFS"] else 0 app_logs = glob.glob(folder + "*.err") + glob.glob(folder + "*.dat") app_info = {} for app_log in sorted(app_logs): app_info = load_app_data(app_log) for app_id in app_info: compute_errors(app_id, app_info[app_id], folder, config) worker_logs = glob.glob(folder + "*worker*.out") cpu_logs = glob.glob(folder + "sar*.log") if len(worker_logs) == len(cpu_logs): workers_dict = {} for worker_log, cpu_log in zip(sorted(worker_logs), sorted(cpu_logs)): worker_dict = load_worker_data(worker_log, cpu_log, config) workers_dict[worker_log] = worker_dict for app_id in app_info: compute_cpu_time(app_id, app_info, workers_dict, config, folder) else: print("ERROR: SAR != WORKER LOGS")