"""
Module that handles the cluster log:
* Download from master and slaves
* Extract app data
* Extract worker data
"""
import multiprocessing
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime as dt
from datetime import timedelta
from util.cmdshell import sshclient_from_instance
from util.utils import timing, string_to_datetime
[docs]def download_master(i, output_folder, log_folder, config):
"""Download log from master instance
:param i: master instance
:param output_folder: output folder where save the log
:param log_folder: log folder on the master instance
:return: output_folder and the app_id: the application id
"""
ssh_client = sshclient_from_instance(i, config["Aws"]["KeyPair"], user_name='ubuntu')
app_id = ""
for file in ssh_client.listdir("" + config["Spark"]["SparkHome"] + "spark-events/"):
print("BENCHMARK: " + file)
print("LOG FOLDER: " + log_folder)
print("OUTPUT FOLDER: " + output_folder)
app_id = file
if log_folder != output_folder:
output_folder = output_folder + app_id
try:
os.makedirs(output_folder)
except FileExistsError:
print("Output folder already exists")
input_file = config["Spark"]["SparkHome"] + "spark-events/" + file
output_bz = input_file + ".bz"
print("Bzipping event log...")
ssh_client.run("pbzip2 -9 -p" + str(
config["Control"]["CoreVM"]) + " -c " + input_file + " > " + output_bz)
ssh_client.get_file(output_bz, output_folder + "/" + file + ".bz")
for file in ssh_client.listdir(log_folder):
print(file)
if file != "bench-report.dat":
output_file = (output_folder + "/" + file).replace(":", "-")
ssh_client.get_file(log_folder + "/" + file, output_file)
return output_folder, app_id
[docs]def download_slave(i, output_folder, app_id, config):
"""Download log from slave instance:
* The worker log that includes the controller output
* The cpu monitoring log
:param i: the slave instance
:param output_folder: the output folder where to save log
:param app_id: the application
:return: output_folder: the output folder
"""
ssh_client = sshclient_from_instance(i, config["Aws"]["KeyPair"], user_name='ubuntu')
print("Downloading log from slave: " + i.public_dns_name)
try:
worker_ip_fixed = i.private_ip_address.replace(".", "-")
worker_log = "{0}logs/spark-ubuntu-org.apache.spark.deploy.worker.Worker-1-ip-{1}.out".format(
config["Spark"]["SparkHome"], worker_ip_fixed)
print(worker_log)
ssh_client.run(
"screen -ls | grep Detached | cut -d. -f1 | awk '{print $1}' | xargs -r kill")
output_worker_log = "{0}/spark-ubuntu-org.apache.spark.deploy.worker.Worker-1-ip-{1}.out".format(
output_folder, i.private_ip_address)
ssh_client.get_file(worker_log, output_worker_log)
ssh_client.get_file("sar-" + i.private_ip_address + ".log",
output_folder + "/" + "sar-" + i.private_ip_address + ".log")
except FileNotFoundError:
print("worker log not found")
try:
for file in ssh_client.listdir(config["Spark"]["SparkHome"] + "work/" + app_id + "/"):
print("Executor ID: " + file)
ssh_client.get_file(
config["Spark"]["SparkHome"] + "work/" + app_id + "/" + file + "/stderr",
output_folder + "/" + i.public_dns_name + "-" + file + ".stderr")
except FileNotFoundError:
print("stderr not found")
return output_folder
@timing
[docs]def download(log_folder, instances, master_dns, output_folder, config):
""" Download the logs from the master and the worker nodes
:param log_folder: the log folder of the application
:param instances: the instances of the cluster
:param master_dns: the dns of the master instances
:param output_folder: the output folder where to save the logs
:return: the output folder
"""
# MASTER
print("Downloading log from Master: " + master_dns)
master_instance = [i for i in instances if i.public_dns_name == master_dns][0]
output_folder, app_id = download_master(master_instance, output_folder, log_folder, config)
# SLAVE
with ThreadPoolExecutor(multiprocessing.cpu_count()) as executor:
for i in instances:
if i.public_dns_name != master_dns:
worker = executor.submit(download_slave, i, output_folder, app_id)
output_folder = worker.result()
return output_folder
[docs]def load_app_data(app_log_path):
"""
Function that parse the application data like stage ids, start, deadline, end,
tasktimestamps from the app_log
:param app_log_path: The log of the application with log level INFO
:return: app_info dictionary
"""
print("Loading app data from log")
dict_to_plot = {}
app_info = {}
app_id = ""
with open(app_log_path) as app_log_fp:
for line in app_log_fp:
line = line.split(" ")
if len(line) > 3:
if line[3] == "TaskSetManager:" and line[4] == "Finished":
try:
app_info[app_id][int(float(line[9]))]["tasktimestamps"].append(
string_to_datetime(line[1]))
except (KeyError, ValueError):
app_info[app_id][int(float(line[9]))]["tasktimestamps"] = []
app_info[app_id][int(float(line[9]))]["tasktimestamps"].append(
string_to_datetime(line[1]))
elif line[3] == "StandaloneSchedulerBackend:" and line[4] == "Connected":
app_info[line[-1].rstrip()] = {}
app_id = line[-1].rstrip()
dict_to_plot[app_id] = {}
dict_to_plot[app_id]["dealineTimeStages"] = []
dict_to_plot[app_id]["startTimeStages"] = []
dict_to_plot[app_id]["finishTimeStages"] = []
elif line[3] == "DAGScheduler:":
if line[4] == "Submitting" and line[6] == "missing":
stage_id = int(line[10])
app_info[app_id][stage_id] = {}
app_info[app_id][stage_id]["tasks"] = int(line[5])
elif line[-4] == "finished":
if app_id != "":
stage_id = int(line[5])
app_info[app_id][stage_id]["end"] = string_to_datetime(line[1])
if len(dict_to_plot[app_id]["startTimeStages"]) > len(
dict_to_plot[app_id]["finishTimeStages"]):
dict_to_plot[app_id]["finishTimeStages"].append(
app_info[app_id][stage_id]["end"])
print("END {1}: {0}".format(app_info[app_id][stage_id]["end"],
stage_id))
elif line[3] == "ControllerJob:":
if line[5] == "INIT":
size_finish = len(dict_to_plot[app_id]["finishTimeStages"]) + 1
if len(dict_to_plot[app_id]["dealineTimeStages"]) < size_finish:
stage_id = int(line[12].replace(",", ""))
app_info[app_id][stage_id]["start"] = string_to_datetime(line[1])
print(
"START {1}: {0}".format(app_info[app_id][stage_id]["start"],
stage_id))
dict_to_plot[app_id]["startTimeStages"].append(
app_info[app_id][stage_id]["start"])
deadline_ms = float(line[16].replace(",", ""))
print(deadline_ms)
app_info[app_id][stage_id]["deadline"] = \
dict_to_plot[app_id]["startTimeStages"][-1] \
+ timedelta(milliseconds=deadline_ms)
dict_to_plot[app_id]["dealineTimeStages"].append(
app_info[app_id][stage_id]["deadline"])
elif line[5] == "NEEDED" and line[4] == "SEND":
next_app_id = line[-1].replace("\n", "")
if app_id != next_app_id:
app_id = next_app_id
dict_to_plot[app_id] = {}
dict_to_plot[app_id]["dealineTimeStages"] = []
dict_to_plot[app_id]["startTimeStages"] = []
dict_to_plot[app_id]["finishTimeStages"] = []
return app_info
[docs]def load_worker_data(worker_log, cpu_log, config):
"""
Load the controller data from the worker_log and combine with the cpu_real data from cpu_log
:param worker_log: the path of the log of the worker
:param cpu_log: the path of the cpu monitoring tool log of the worker
:param config: the configuration dictionary
:return: worker_dict the dictionary of the worker's data
"""
print(worker_log)
print(cpu_log)
worker_dict = {}
with open(worker_log) as wlog:
app_id = ""
worker_dict["cpu_real"] = []
worker_dict["time_cpu"] = []
sid = -1
for line in wlog:
line = line.split(" ")
if len(line) > 3:
if line[4] == "Created" and app_id != "":
if sid != int(line[8]):
sid = int(line[8])
worker_dict[app_id][sid] = {}
worker_dict[app_id][sid]["cpu"] = []
worker_dict[app_id][sid]["time"] = []
worker_dict[app_id][sid]["sp_real"] = []
worker_dict[app_id][sid]["sp"] = []
worker_dict[app_id][sid]["cpu"].append(float(line[-1].replace("\n", "")))
worker_dict[app_id][sid]["sp_real"].append(0.0)
worker_dict[app_id][sid]["time"].append(string_to_datetime(line[1]))
worker_dict[app_id][sid]["sp"].append(0.0)
if line[4] == "Scaled":
# print(l)
if app_id == "" or app_id != line[10]:
next_app_id = line[10]
try:
worker_dict[next_app_id] = {}
app_id = next_app_id
except KeyError:
print(next_app_id + " NOT FOUND BEFORE IN BENCHMARK LOGS")
if app_id != "":
if line[4] == "CoreToAllocate:":
# print(l)
worker_dict[app_id][sid]["cpu"].append(float(line[-1].replace("\n", "")))
if line[4] == "Real:":
worker_dict[app_id][sid]["sp_real"].append(
float(line[-1].replace("\n", "")))
if line[4] == "SP":
worker_dict[app_id][sid]["time"].append(string_to_datetime(line[1]))
# print(l[-1].replace("\n", ""))
progress = float(line[-1].replace("\n", ""))
# print(sp)
if progress < 0.0:
worker_dict[app_id][sid]["sp"].append(abs(progress) / 100)
else:
worker_dict[app_id][sid]["sp"].append(progress)
with open(cpu_log) as cpu_log_fp:
for line in cpu_log_fp:
line = line.split(" ")
if not ("Linux" in line[0].split(" ") or "\n" in line[0].split(" ")) \
and line[1] != " CPU" and line[0] != "Average:":
worker_dict["time_cpu"].append(
dt.strptime(line[0], '%I:%M:%S %p').replace(year=2016))
if config["Aws"]["HyperThreading"]:
cpu_real = float(
'{0:.2f}'.format((float(line[2]) * config["Control"]["CoreVM"] * 2) / 100))
else:
cpu_real = float(
'{0:.2f}'.format((float(line[2]) * config["Control"]["CoreVM"]) / 100))
worker_dict["cpu_real"].append(cpu_real)
print(list(worker_dict.keys()))
return worker_dict