Skip to content

API Reference

Auto-generated API documentation for the AMD MI300X benchmarking suite.

amd_bench.core.analysis.BenchmarkAnalyzer

BenchmarkAnalyzer(config: AnalysisConfig)

Comprehensive analyzer for vLLM benchmark results

Initialize analyzer with configuration

Methods:

Name Description
discover_experiment_files

Discover and match all files for each experiment.

load_monitoring_data

Load monitoring data for an experiment from CSV files.

process_results

Process all benchmark result files and generate comprehensive analysis.

Source code in src/amd_bench/core/analysis.py
def __init__(self, config: AnalysisConfig):
    """Initialize analyzer with configuration"""
    self.config: AnalysisConfig = config
    self.results: List[BenchmarkResult] = []
    self.experiment_files: List[ExperimentFiles] = []

    # Build dynamic directory paths
    self.results_dir = self._get_results_directory()
    self.logs_dir = self._get_logs_directory()
    self.monitoring_dir = self._get_monitoring_directory()

    # Convert config to FilenameFormat objects for processing
    self.filename_formats = self._build_filename_formats()

    # Ensure output directories and log directory exist
    self._setup_output_directories()
    self._validate_and_log_structure()

    # Cache for tensor parallel size (parsed once per benchmark run)
    self._tensor_parallel_cache: Optional[int] = None
    self._benchmark_run_log_path: Optional[Path] = None

    logger.info("Analyzer initialized with custom filename formats")
    logger.info(f"Loaded {len(self.filename_formats)} filename formats")
    logger.info(f"Input directory: {self.config.input_dir}")
    logger.info(f"Results directory: {self.results_dir}")
    logger.info(f"Logs directory: {self.logs_dir}")
    logger.info(f"Monitoring directory: {self.monitoring_dir}")
    logger.info(f"Output directory: {self.config.output_dir}")

discover_experiment_files

discover_experiment_files() -> List[ExperimentFiles]

Discover and match all files for each experiment.

This method systematically searches for benchmark result files and their associated monitoring data (logs, CPU metrics, GPU power/temperature data), creating a comprehensive mapping of all experiment files. It handles flexible directory structures and provides filtering options based on completeness requirements.

The discovery process follows these steps: 1. Try to discover the master benchmark log run 2. Locate all JSON result files matching the configured pattern 3. For each result file, search for corresponding monitoring files 4. Build ExperimentFiles objects containing all related file paths 5. Apply filtering based on completeness requirements if specified

Returns: List[ExperimentFiles]: A list of ExperimentFiles objects, each containing: - result_file: Path to the JSON benchmark results file - log_file: Optional path to execution log file - cpu_metrics_file: Optional path to CPU monitoring CSV - gpu_power_file: Optional path to GPU power monitoring CSV - gpu_temp_file: Optional path to GPU temperature monitoring CSV

Raises: FileNotFoundError: If no result files are found matching the pattern. PermissionError: If files exist but are not readable.

Note: The method respects the require_complete_monitoring configuration flag. When enabled, only experiments with all monitoring files are returned. File matching is based on basename pattern matching across subdirectories.

Example:

analyzer = BenchmarkAnalyzer(config)
experiments = analyzer.discover_experiment_files()
print(f"Found {len(experiments)} complete experiment file sets")

for exp in experiments:
    if exp.has_complete_monitoring:
        print(f"Complete monitoring data for {exp.result_file.name}")

Source code in src/amd_bench/core/analysis.py
def discover_experiment_files(self) -> List[ExperimentFiles]:
    """Discover and match all files for each experiment.

    This method systematically searches for benchmark result files and their associated
    monitoring data (logs, CPU metrics, GPU power/temperature data), creating a
    comprehensive mapping of all experiment files. It handles flexible directory
    structures and provides filtering options based on completeness requirements.

    The discovery process follows these steps:
    1. Try to discover the master benchmark log run
    2. Locate all JSON result files matching the configured pattern
    3. For each result file, search for corresponding monitoring files
    4. Build ExperimentFiles objects containing all related file paths
    5. Apply filtering based on completeness requirements if specified

    Returns:
        List[ExperimentFiles]: A list of ExperimentFiles objects, each containing:
            - result_file: Path to the JSON benchmark results file
            - log_file: Optional path to execution log file
            - cpu_metrics_file: Optional path to CPU monitoring CSV
            - gpu_power_file: Optional path to GPU power monitoring CSV
            - gpu_temp_file: Optional path to GPU temperature monitoring CSV

    Raises:
        FileNotFoundError: If no result files are found matching the pattern.
        PermissionError: If files exist but are not readable.

    Note:
        The method respects the `require_complete_monitoring` configuration flag.
        When enabled, only experiments with all monitoring files are returned.
        File matching is based on basename pattern matching across subdirectories.

    Example:
        ```
        analyzer = BenchmarkAnalyzer(config)
        experiments = analyzer.discover_experiment_files()
        print(f"Found {len(experiments)} complete experiment file sets")

        for exp in experiments:
            if exp.has_complete_monitoring:
                print(f"Complete monitoring data for {exp.result_file.name}")
        ```
    """
    logger.info("Discovering experiment files...")

    # First, find the master benchmark run log
    benchmark_run_log = self._find_benchmark_run_log()

    # Get all JSON result files
    result_files = list(self.results_dir.glob(self.config.results_pattern))
    experiments = []

    for result_file in result_files:
        try:
            experiment = self._build_experiment_files(result_file, benchmark_run_log)
            experiments.append(experiment)

            # Log what we found for this experiment
            monitoring_status = "complete" if experiment.has_complete_monitoring else "partial"
            logger.debug(f"Experiment {result_file.name}: {monitoring_status} monitoring data")

        except Exception as e:
            logger.error(f"Error processing experiment {result_file.name}: {e}")

    # Filter experiments if complete monitoring is required
    if self.config.require_complete_monitoring:
        complete_experiments = [exp for exp in experiments if exp.has_complete_monitoring]
        logger.info(
            f"Filtered to {len(complete_experiments)}/{len(experiments)} experiments with complete monitoring"
        )
        experiments = complete_experiments

    logger.info(f"Discovered {len(experiments)} experiment file sets")
    return experiments

load_monitoring_data staticmethod

load_monitoring_data(
    experiment: ExperimentFiles,
) -> Dict[str, DataFrame]

Load monitoring data for an experiment from CSV files.

This method loads and preprocesses hardware monitoring data associated with a specific experiment, including CPU utilization, GPU power consumption, and thermal metrics. It handles multiple file formats and performs data validation and timestamp normalization.

The method processes three types of monitoring data: - CPU Metrics: System utilization, load averages, idle percentages - GPU Power: Per-device power consumption over time - GPU Temperature: Edge and junction temperatures for thermal analysis

Args: experiment (ExperimentFiles): Container with paths to monitoring files. Must contain at least one of: cpu_metrics_file, gpu_power_file, gpu_temp_file. Missing files are silently skipped.

Returns: Dict[str, pd.DataFrame]: Dictionary mapping data types to DataFrames: - 'cpu': CPU monitoring data with timestamp column - 'gpu_power': GPU power consumption data - 'gpu_temp': GPU temperature monitoring data

Each DataFrame includes a standardized 'timestamp' column converted
to pandas datetime format for time-series analysis.

Raises: FileNotFoundError: If specified monitoring files don't exist. pd.errors.EmptyDataError: If CSV files are empty or malformed. ValueError: If timestamp columns cannot be parsed.

Example:

experiment = ExperimentFiles(
    result_file=Path("result.json"),
    cpu_metrics_file=Path("cpu_metrics.csv"),
    gpu_power_file=Path("gpu_power.csv")
)

monitoring_data = BenchmarkAnalyzer.load_monitoring_data(experiment)

if 'cpu' in monitoring_data:
    cpu_df = monitoring_data['cpu']
    print(f"CPU monitoring duration: {cpu_df['timestamp'].max() - cpu_df['timestamp'].min()}")

if 'gpu_power' in monitoring_data:
    power_df = monitoring_data['gpu_power']
    total_power = power_df.groupby('timestamp')['power_watts'].sum()
    print(f"Average total power: {total_power.mean():.1f}W")

Note: - Timestamps are expected in Unix epoch format (seconds since 1970) - GPU data may contain multiple devices with separate readings - Missing or corrupted files are logged as errors but don't raise exceptions - Empty DataFrames are returned for missing monitoring categories

Source code in src/amd_bench/core/analysis.py
@staticmethod
def load_monitoring_data(experiment: ExperimentFiles) -> Dict[str, pd.DataFrame]:
    """Load monitoring data for an experiment from CSV files.

    This method loads and preprocesses hardware monitoring data associated with
    a specific experiment, including CPU utilization, GPU power consumption,
    and thermal metrics. It handles multiple file formats and performs data
    validation and timestamp normalization.

    The method processes three types of monitoring data:
    - **CPU Metrics**: System utilization, load averages, idle percentages
    - **GPU Power**: Per-device power consumption over time
    - **GPU Temperature**: Edge and junction temperatures for thermal analysis

    Args:
        experiment (ExperimentFiles): Container with paths to monitoring files.
            Must contain at least one of: cpu_metrics_file, gpu_power_file,
            gpu_temp_file. Missing files are silently skipped.

    Returns:
        Dict[str, pd.DataFrame]: Dictionary mapping data types to DataFrames:
            - 'cpu': CPU monitoring data with timestamp column
            - 'gpu_power': GPU power consumption data
            - 'gpu_temp': GPU temperature monitoring data

        Each DataFrame includes a standardized 'timestamp' column converted
        to pandas datetime format for time-series analysis.

    Raises:
        FileNotFoundError: If specified monitoring files don't exist.
        pd.errors.EmptyDataError: If CSV files are empty or malformed.
        ValueError: If timestamp columns cannot be parsed.

    Example:
        ```
        experiment = ExperimentFiles(
            result_file=Path("result.json"),
            cpu_metrics_file=Path("cpu_metrics.csv"),
            gpu_power_file=Path("gpu_power.csv")
        )

        monitoring_data = BenchmarkAnalyzer.load_monitoring_data(experiment)

        if 'cpu' in monitoring_data:
            cpu_df = monitoring_data['cpu']
            print(f"CPU monitoring duration: {cpu_df['timestamp'].max() - cpu_df['timestamp'].min()}")

        if 'gpu_power' in monitoring_data:
            power_df = monitoring_data['gpu_power']
            total_power = power_df.groupby('timestamp')['power_watts'].sum()
            print(f"Average total power: {total_power.mean():.1f}W")
        ```

    Note:
        - Timestamps are expected in Unix epoch format (seconds since 1970)
        - GPU data may contain multiple devices with separate readings
        - Missing or corrupted files are logged as errors but don't raise exceptions
        - Empty DataFrames are returned for missing monitoring categories
    """
    monitoring_data = {}

    try:
        # Load CPU metrics
        if experiment.cpu_metrics_file and experiment.cpu_metrics_file.exists():
            cpu_df = pd.read_csv(experiment.cpu_metrics_file)
            cpu_df["timestamp"] = pd.to_datetime(cpu_df["timestamp"], unit="s")
            monitoring_data["cpu"] = cpu_df

        # Load GPU power metrics
        if experiment.gpu_power_file and experiment.gpu_power_file.exists():
            power_df = pd.read_csv(experiment.gpu_power_file)
            power_df["timestamp"] = pd.to_datetime(power_df["timestamp"], unit="s")
            monitoring_data["gpu_power"] = power_df

        # Load GPU temperature metrics
        if experiment.gpu_temp_file and experiment.gpu_temp_file.exists():
            temp_df = pd.read_csv(experiment.gpu_temp_file)
            temp_df["timestamp"] = pd.to_datetime(temp_df["timestamp"], unit="s")
            monitoring_data["gpu_temp"] = temp_df

    except Exception as e:
        logger.error(f"Error loading monitoring data for {experiment.result_file.name}: {e}")

    return monitoring_data

process_results

process_results() -> None

Process all benchmark result files and generate comprehensive analysis.

This is the main orchestration method that coordinates the complete analysis workflow from raw benchmark files to final reports and visualizations. It handles file discovery, data loading, statistical analysis, visualization generation, and report creation in a fault-tolerant manner.

The processing pipeline includes: 1. File Discovery: Locate all experiment files and validate structure 2. Data Loading: Parse JSON results and extract benchmark metrics 3. Statistical Analysis: Generate performance summaries and comparisons 4. Monitoring Processing: Analyze hardware metrics if available 5. Visualization: Create performance plots and dashboards 6. Report Generation: Produce markdown and JSON analysis reports

The method implements comprehensive error handling and logging to ensure partial results are preserved even if individual steps fail.

Raises: ValueError: If no valid experiment files are found. RuntimeError: If critical analysis steps fail unexpectedly. PermissionError: If output directories cannot be created or accessed.

Side Effects: - Creates output directory structure (tables/, plots/, reports/) - Writes CSV files with statistical summaries - Generates PNG visualization files - Creates comprehensive analysis reports - Logs detailed progress and error information

Example:

config = AnalysisConfig(
    input_dir=Path("benchmark_data"),
    output_dir=Path("analysis_output"),
    generate_plots=True,
    include_monitoring_data=True
)

analyzer = BenchmarkAnalyzer(config)
analyzer.process_results()

# Results available in:
# - analysis_output/tables/*.csv
# - analysis_output/plots/*.png
# - analysis_output/reports/*.{md,json}

Note: Processing time scales with dataset size and enabled features. Large datasets with monitoring data may require several minutes. Progress is logged at INFO level for monitoring long-running analyses.

Source code in src/amd_bench/core/analysis.py
def process_results(self) -> None:
    """Process all benchmark result files and generate comprehensive analysis.

    This is the main orchestration method that coordinates the complete analysis
    workflow from raw benchmark files to final reports and visualizations. It
    handles file discovery, data loading, statistical analysis, visualization
    generation, and report creation in a fault-tolerant manner.

    The processing pipeline includes:
    1. **File Discovery**: Locate all experiment files and validate structure
    2. **Data Loading**: Parse JSON results and extract benchmark metrics
    3. **Statistical Analysis**: Generate performance summaries and comparisons
    4. **Monitoring Processing**: Analyze hardware metrics if available
    5. **Visualization**: Create performance plots and dashboards
    6. **Report Generation**: Produce markdown and JSON analysis reports

    The method implements comprehensive error handling and logging to ensure
    partial results are preserved even if individual steps fail.

    Raises:
        ValueError: If no valid experiment files are found.
        RuntimeError: If critical analysis steps fail unexpectedly.
        PermissionError: If output directories cannot be created or accessed.

    Side Effects:
        - Creates output directory structure (tables/, plots/, reports/)
        - Writes CSV files with statistical summaries
        - Generates PNG visualization files
        - Creates comprehensive analysis reports
        - Logs detailed progress and error information

    Example:
        ```
        config = AnalysisConfig(
            input_dir=Path("benchmark_data"),
            output_dir=Path("analysis_output"),
            generate_plots=True,
            include_monitoring_data=True
        )

        analyzer = BenchmarkAnalyzer(config)
        analyzer.process_results()

        # Results available in:
        # - analysis_output/tables/*.csv
        # - analysis_output/plots/*.png
        # - analysis_output/reports/*.{md,json}
        ```

    Note:
        Processing time scales with dataset size and enabled features.
        Large datasets with monitoring data may require several minutes.
        Progress is logged at INFO level for monitoring long-running analyses.
    """
    logger.info("Starting benchmark results processing...")

    try:
        # Discover all experiment files
        self.experiment_files = self.discover_experiment_files()

        if not self.experiment_files:
            logger.warning("No valid experiment files found")
            return

        # Load benchmark results
        self.results = self._load_benchmark_results()

        if not self.results:
            logger.error("No valid results could be loaded")
            return

        # Generate statistical analysis
        stats_analyzer = StatisticalAnalyzer(self.results)
        stats_analyzer.export_summaries(self.config.output_dir)

        # Process monitoring data
        monitoring_dataframes = self._process_monitoring_data()

        # Generate visualizations
        if self.config.generate_plots:
            plot_generator = BenchmarkVisualizer(
                self.results,
                config=AnalysisConfig(
                    input_dir=self.config.input_dir, output_dir=Path("plots")
                ),
            )
            standard_plots = plot_generator.create_all_plots(self.config.output_dir / "plots")

            # Create monitoring plots with in-memory data
            if monitoring_dataframes:
                monitoring_plots = self._create_monitoring_visualizations(monitoring_dataframes)
                standard_plots.update(monitoring_plots)

        # Generate reports
        report_generator = ReportGenerator(self.config, self.results, stats_analyzer)
        report_generator.create_reports(
            self.config.output_dir / "reports", monitoring_dataframes
        )

        logger.info("Benchmark analysis completed successfully")
        logger.info(f"Results available in: {self.config.output_dir}")

    except Exception as e:
        logger.error(f"Error during analysis processing: {e}")
        raise

options: show_root_heading: true show_source: false

amd_bench.schemas.benchmark.AnalysisConfig

Bases: BaseModel

Configuration for analysis operations

Methods:

Name Description
validate_directory_structure

Validate directory structure using configuration parameters.

validate_filename_formats

Validate filename format configurations

validate_output_dir

Ensure output directory is resolved.

validate_directory_structure

validate_directory_structure() -> AnalysisConfig

Validate directory structure using configuration parameters.

Source code in src/amd_bench/schemas/benchmark.py
@model_validator(mode="after")
def validate_directory_structure(self) -> "AnalysisConfig":
    """Validate directory structure using configuration parameters."""
    resolved = self.input_dir.resolve()

    if not resolved.exists():
        raise ValueError(f"Input directory does not exist: {resolved}")
    if not resolved.is_dir():
        raise ValueError(f"Input path is not a directory: {resolved}")

    results_dir = resolved / self.results_subdir

    if not results_dir.exists():
        raise ValueError(
            f"Results subdirectory '{self.results_subdir}' not found in {resolved}"
        )

    # Check for files using the configured pattern
    result_files = list(results_dir.glob(self.results_pattern))
    if not result_files:
        raise ValueError(f"No files matching '{self.results_pattern}' found in {results_dir}")

    return self

validate_filename_formats classmethod

validate_filename_formats(
    v: List[Dict[str, Any]],
) -> List[Dict[str, Any]]

Validate filename format configurations

Source code in src/amd_bench/schemas/benchmark.py
@field_validator("filename_formats")
@classmethod
def validate_filename_formats(cls, v: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Validate filename format configurations"""
    for i, fmt in enumerate(v):
        if "pattern" not in fmt or "groups" not in fmt:
            raise ValueError(f"Format {i}: must have 'pattern' and 'groups'")

        # Test if pattern compiles
        try:
            re.compile(fmt["pattern"])
        except re.error as e:
            raise ValueError(f"Format {i}: invalid regex pattern: {e}") from e

    return v

validate_output_dir classmethod

validate_output_dir(v: Path) -> Path

Ensure output directory is resolved.

Source code in src/amd_bench/schemas/benchmark.py
@field_validator("output_dir")
@classmethod
def validate_output_dir(cls, v: Path) -> Path:
    """Ensure output directory is resolved."""
    resolved = v.resolve()

    # Create parent directories if they don't exist
    resolved.parent.mkdir(parents=True, exist_ok=True)

    return resolved

options: show_root_heading: true show_source: false