In [None]:
import numpy as np
import matplotlib.pyplot as plt
import mined_metric.builder.proto.metric_definitions_pb2 as metric_definitions_proto
import mined_metric.builder.metrics_impl.prediction_error_metrics.prediction_error_metrics_pb2 as prediction_error_metrics_proto
from base.proto import ReadProto, WriteProto
import matplotlib.patches as mpatches

import scipy
from scipy.stats import pearsonr

EXPERIMENT_ID = "EXPERIMENT_ID_PLACEHOLDER"
EXPERIMENT_FILEPATH = "/mnt/sun-pcs01/mrama/prediction/error/metrics/pipeline/" + EXPERIMENT_ID
FILENAME='aggregated_metrics.AggregatedMetrics.pb'
PREDICTION_TIMESTEPS = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0]

def get_aggregated_metrics(path, filename):
    aggregated_metrics = metric_definitions_proto.AggregatedMetrics()
    ReadProto(path + "/" + filename, aggregated_metrics)
    for error in aggregated_metrics.Extensions[prediction_error_metrics_proto.PredictionErrorAggregatedMetrics.ext].sampled_track_errors:
        error.s_error_uncertainty_ratio = abs(error.s_error) / error.s_uncertainty
        error.t_error_uncertainty_ratio = abs(error.t_error) / error.t_uncertainty
        error.ellipse_error_uncertainty_ratio = max(error.s_error_uncertainty_ratio, error.t_error_uncertainty_ratio)
    return aggregated_metrics.Extensions[prediction_error_metrics_proto.PredictionErrorAggregatedMetrics.ext]

def get_ratios(aggregated_metrics, time_delta):
    for ratio in aggregated_metrics.prediction_timestep_error_ratios:
        if ratio.desired_time_delta == time_delta:
            uncertainty_bins = [error_ratios.uncertainty_multiplier for error_ratios in ratio.error_ratios]
            error_s_uncertainty_ratios = [error_ratio.error_s_uncertainty_ratio for error_ratio in ratio.error_ratios]
            error_t_uncertainty_ratios = [error_ratio.error_t_uncertainty_ratio for error_ratio in ratio.error_ratios]
            error_ellipse_uncertainty_ratios = [error_ratio.error_ellipse_uncertainty_ratio for error_ratio in ratio.error_ratios]
            return uncertainty_bins, error_s_uncertainty_ratios, error_t_uncertainty_ratios, error_ellipse_uncertainty_ratios

def get_errors(aggregated_metrics, time_delta):
    s_errors = []
    t_errors = []
    s_uncertainties = []
    t_uncertaintes  = []
    actual_time_deltas = []
    for error in aggregated_metrics.sampled_track_errors:
        if error.desired_time_delta == time_delta:
            s_errors.append(abs(error.s_error))
            t_errors.append(abs(error.t_error))
            s_uncertainties.append(error.s_uncertainty)
            t_uncertaintes.append(error.t_uncertainty)
            actual_time_deltas.append(error.time_delta)
    return s_errors, t_errors, s_uncertainties, t_uncertaintes

# Function to generate cumulative plot for error ratio
def generate_plots_for_ratios(aggregated_metrics, time_delta):
    uncertainty_bins, \
    error_s_uncertainty_ratios, \
    error_t_uncertainty_ratios, \
    error_ellipse_uncertainty_ratios = get_ratios(aggregated_metrics, time_delta)
    xlimit, ylimit = [0, 5], [0, 1]
    hspace, wspace = 0.5, 0.3
    fig = plt.figure()
    fig.set_size_inches(15, 5)
    fig.suptitle('{}s prediction step'.format(time_delta), fontsize=16)

    plt.subplot(2, 2, 1)
    plt.plot([0] + uncertainty_bins, [0] + error_s_uncertainty_ratios)
    plt.xlabel('s_uncertainty_coefficient')
    plt.ylabel('cumulative_s_error_ratio')
    plt.axis(xlimit + ylimit)
    plt.subplots_adjust(wspace = wspace, hspace = hspace)

    plt.subplot(2, 2, 2)
    plt.plot([0] + uncertainty_bins, [0] + error_t_uncertainty_ratios)
    plt.xlabel('t_uncertainty_coefficient')
    plt.ylabel('cumulative_t_error_ratio')
    plt.axis(xlimit + ylimit)
    plt.subplots_adjust(wspace = wspace, hspace = hspace)
    
    plt.subplot(2, 2, 3)
    plt.plot([0] + uncertainty_bins, [0] + error_ellipse_uncertainty_ratios)
    plt.xlabel('ellipse_uncertainty_coefficient')
    plt.ylabel('cumulative_ellipse_error_ratio')
    plt.axis(xlimit + ylimit)
    plt.subplots_adjust(wspace = wspace, hspace = hspace)

    plt.show()

# Function to generate the scatter plot for uncertainty and error correlation
def generate_plots_for_errors(aggregated_metrics, time_delta):
    s_errors, t_errors, s_uncertainties, t_uncertaintes = get_errors(aggregated_metrics, time_delta)
    
    marker_size = 1
    hspace, wspace = 0.5, 0.3
    xlimit, ylimit = [0, 5], [0, 5]
    fig = plt.figure()
    fig.set_size_inches(15, 5)
    fig.suptitle('{}s prediction step'.format(time_delta), fontsize=16)

    plt.subplot(1, 2, 1)
   
    plt.scatter(s_errors, s_uncertainties, s=[marker_size]*len(s_errors))
    plt.xlabel('s_error')
    plt.ylabel('s_uncertainty')
    plt.axis(xlimit + ylimit)
    plt.subplots_adjust(wspace = wspace, hspace = hspace)
    red_patch = mpatches.Patch(color='white', label='pearsonr: '+str(pearsonr(s_errors, s_uncertainties)[0]))
    plt.legend(handles=[red_patch])


    plt.subplot(1, 2, 2)
    plt.scatter(t_errors, t_uncertaintes, s=[marker_size]*len(t_errors))
    plt.xlabel('t_error')
    plt.ylabel('t_uncertainty')
    plt.axis(xlimit + ylimit)
    plt.subplots_adjust(wspace = wspace, hspace = hspace)
    red_patch = mpatches.Patch(color='white', label='pearsonr: '+str(pearsonr(t_errors, t_uncertaintes)[0]))
    plt.legend(handles=[red_patch])

    plt.show()

In [None]:
# Get aggregated metrics
aggregated_metrics = get_aggregated_metrics(EXPERIMENT_FILEPATH, FILENAME)

In [None]:
# Generate Plots for all prediction timesteps
for time_delta in PREDICTION_TIMESTEPS:
    generate_plots_for_ratios(aggregated_metrics, time_delta)
    generate_plots_for_errors(aggregated_metrics, time_delta)