Skip to content
RTL Design Sherpa CocoTB Framework · Verification Infrastructure for RTL Testing
GitHub · Documentation Index · MIT License

gaxi_master.py

GAXI Master with integrated structured pipeline that maintains all existing functionality and timing while adding better debugging and error recovery through structured pipeline phases.

Overview

The GAXIMaster class provides: - Structured pipeline with enhanced debugging and error recovery - Exact timing compatibility with existing code - Signal resolution and data driving from GAXIComponentBase - Memory model integration using base MemoryModel directly - Comprehensive statistics including pipeline performance - Optional pipeline debugging for troubleshooting

Inherits common functionality from GAXIComponentBase and extends CocoTB BusDriver.

Class

GAXIMaster

class GAXIMaster(GAXIComponentBase, BusDriver):
    def __init__(self, dut, title, prefix, clock, field_config,
                 multi_sig=False, randomizer=None, memory_model=None,
                 log=None, super_debug=False, pipeline_debug=False,
                 signal_map=None, **kwargs)

Parameters: - dut: Device under test - title: Component title/name - prefix: Bus prefix - clock: Clock signal - field_config: Field configuration - timeout_cycles: Timeout for handshake operations (default: 1000) - mode: GAXI mode ('skid', 'fifo_mux', 'fifo_flop') - bus_name: Bus/channel name - pkt_prefix: Packet field prefix - multi_sig: Whether using multi-signal mode - randomizer: Optional randomizer for timing - memory_model: Optional memory model for transactions - log: Logger instance - super_debug: Enable detailed debugging - pipeline_debug: Enable pipeline phase debugging - signal_map: Optional manual signal mapping override - **kwargs: Additional arguments

Core Methods

Transaction Sending

async send(packet)

Send a packet and wait for completion.

Parameters: - packet: Packet to send

Returns: True when complete

# Create and send packet
packet = master.create_packet(addr=0x1000, data=0xDEADBEEF)
await master.send(packet)

async busy_send(transaction)

Send a transaction and wait for completion with busy checking.

Parameters: - transaction: Transaction to send

# Send with busy checking
transaction = master.create_packet(data=0x12345678)
await master.busy_send(transaction)

create_packet(**field_values)

Create a packet with specified field values.

Parameters: - **field_values: Initial field values

Returns: GAXIPacket instance

# Create packet with fields
packet = master.create_packet(
    addr=0x2000,
    data=0xCAFEBABE,
    cmd=0x1  # READ command
)

Bus Management

async reset_bus()

Reset bus with enhanced pipeline state management.

await master.reset_bus()

Memory Operations

async write_to_memory(packet)

Write packet to memory using unified memory integration.

Parameters: - packet: Packet to write

Returns: Success boolean

packet = master.create_packet(addr=0x1000, data=0xDEADBEEF)
success = await master.write_to_memory(packet)
if not success:
    log.error("Memory write failed")

async read_from_memory(packet)

Read data from memory using unified memory integration.

Parameters: - packet: Packet with address to read

Returns: Tuple of (success, data)

packet = master.create_packet(addr=0x1000)
success, data = await master.read_from_memory(packet)
if success:
    log.info(f"Read data: 0x{data:X}")

Pipeline Control and Debugging

enable_pipeline_debug(enable=True)

Enable or disable pipeline debugging at runtime.

Parameters: - enable: Enable debugging flag

# Enable detailed pipeline logging
master.enable_pipeline_debug(True)

# Disable for performance
master.enable_pipeline_debug(False)

get_pipeline_stats()

Get pipeline-specific statistics.

Returns: Dictionary with pipeline statistics

pipeline_stats = master.get_pipeline_stats()
print(f"Current state: {pipeline_stats['current_state']}")
print(f"Queue depth: {pipeline_stats['queue_depth']}")
print(f"Phase counts: P1={pipeline_stats['phase1_count']}")

get_pipeline_debug_info()

Get detailed pipeline debug information.

Returns: Dictionary with debug information

debug_info = master.get_pipeline_debug_info()
print(f"Phase timings: {debug_info['phase_timings']}")
print(f"Statistics: {debug_info['phase_statistics']}")

Statistics

get_stats()

Get comprehensive statistics including pipeline stats.

Returns: Dictionary containing all statistics

stats = master.get_stats()
print(f"Master stats: {stats['master_stats']}")
print(f"Base stats: {stats}")
print(f"Pipeline stats: {stats['pipeline_stats']}")

Pipeline Architecture

The GAXIMaster uses a structured 3-phase pipeline:

Phase 1: Apply Delays

  • Apply valid_delay from randomizer
  • Deassert valid and clear data during delay
  • Maintains exact original timing

Phase 2: Drive and Handshake

  • Drive signals for transaction
  • Assert valid signal
  • Wait for ready handshake (with timeout)
  • Enhanced error recovery on timeout

Phase 3: Complete Transfer

  • Capture completion time
  • Deassert valid
  • Clear data bus
  • Move to sent queue

Pipeline State Tracking

# Pipeline states
"idle"  "queued"  "pipeline_active"  "transaction_start" 
"phase1"  "phase2"  "phase3"  "transaction_complete"  "idle"

# Error states
"error_recovery", "reset"

Usage Patterns

Basic Usage

import cocotb
from cocotb.triggers import RisingEdge
from CocoTBFramework.components.gaxi import GAXIMaster
from CocoTBFramework.shared.field_config import FieldConfig

@cocotb.test()
async def test_gaxi_master(dut):
    clock = dut.clk

    # Create field configuration
    field_config = FieldConfig()
    field_config.add_field(FieldDefinition("addr", 32, format="hex"))
    field_config.add_field(FieldDefinition("data", 32, format="hex"))

    # Create master
    master = GAXIMaster(
        dut=dut,
        title="TestMaster",
        prefix="",
        clock=clock,
        field_config=field_config,
        pipeline_debug=True  # Enable pipeline debugging
    )

    # Reset bus
    await master.reset_bus()

    # Send transactions
    for i in range(10):
        packet = master.create_packet(
            addr=0x1000 + i*4,
            data=0x12340000 + i
        )
        await master.send(packet)

    # Get statistics
    stats = master.get_stats()
    print(f"Sent {stats['master_stats']['transactions_completed']} transactions")

Advanced Configuration

from CocoTBFramework.shared.flex_randomizer import FlexRandomizer
from CocoTBFramework.shared.memory_model import MemoryModel

# Create randomizer for timing
randomizer = FlexRandomizer({
    'valid_delay': ([(0, 0), (1, 5), (10, 20)], [0.6, 0.3, 0.1])
})

# Create memory model
memory = MemoryModel(num_lines=1024, bytes_per_line=4, log=log)

# Create master with advanced configuration
master = GAXIMaster(
    dut=dut,
    title="AdvancedMaster",
    prefix="m_",
    clock=clock,
    field_config=field_config,
    randomizer=randomizer,
    memory_model=memory,
    timeout_cycles=5000,
    pipeline_debug=True,
    multi_sig=True
)

Memory-Integrated Testing

@cocotb.test()
async def test_memory_operations(dut):
    master = GAXIMaster(dut, "MemMaster", "", clock, field_config,
                       memory_model=memory)

    # Write to memory
    write_packet = master.create_packet(addr=0x1000, data=0xDEADBEEF)
    success = await master.write_to_memory(write_packet)
    assert success, "Memory write failed"

    # Read from memory
    read_packet = master.create_packet(addr=0x1000)
    success, data = await master.read_from_memory(read_packet)
    assert success, "Memory read failed"
    assert data == 0xDEADBEEF, f"Data mismatch: expected 0xDEADBEEF, got 0x{data:X}"

    # Send read transaction
    await master.send(read_packet)

Pipeline Debugging

@cocotb.test()
async def test_with_pipeline_debug(dut):
    master = GAXIMaster(dut, "DebugMaster", "", clock, field_config,
                       pipeline_debug=True)

    # Send transaction with detailed logging
    packet = master.create_packet(addr=0x1000, data=0x12345678)
    await master.send(packet)

    # Analyze pipeline performance
    pipeline_stats = master.get_pipeline_stats()
    print(f"Pipeline Statistics:")
    print(f"  Phase 1 count: {pipeline_stats['phase1_count']}")
    print(f"  Phase 2 count: {pipeline_stats['phase2_count']}")
    print(f"  Phase 3 count: {pipeline_stats['phase3_count']}")
    print(f"  Timeouts: {pipeline_stats['timeout_count']}")
    print(f"  Errors: {pipeline_stats['error_count']}")

    # Get debug info
    debug_info = master.get_pipeline_debug_info()
    for state, timing in debug_info['phase_timings'].items():
        print(f"  {state}: {timing}ns")

Burst Transactions

async def send_burst_transactions(master, base_addr, count):
    """Send a burst of transactions"""
    for i in range(count):
        packet = master.create_packet(
            addr=base_addr + i*4,
            data=0x10000000 + i
        )
        await master.send(packet)

    # Get performance metrics
    stats = master.get_stats()
    master_stats = stats['master_stats']

    print(f"Burst complete:")
    print(f"  Transactions: {master_stats['transactions_completed']}")
    print(f"  Average latency: {master_stats['avg_latency']:.2f}ns")
    print(f"  Throughput: {master_stats['current_throughput_tps']:.1f} TPS")

Error Handling and Recovery

@cocotb.test()
async def test_error_recovery(dut):
    master = GAXIMaster(dut, "ErrorMaster", "", clock, field_config,
                       timeout_cycles=100,  # Short timeout for testing
                       pipeline_debug=True)

    try:
        # This might timeout if slave is not ready
        packet = master.create_packet(addr=0x1000, data=0x12345678)
        await master.send(packet)

    except Exception as e:
        log.error(f"Transaction failed: {e}")

        # Check pipeline state
        debug_info = master.get_pipeline_debug_info()
        print(f"Pipeline state: {debug_info['current_state']}")

        # Reset and try again
        await master.reset_bus()
        await master.send(packet)  # Should work after reset

Performance Monitoring

async def monitor_performance(master, duration_ms=1000):
    """Monitor master performance over time"""
    import asyncio

    start_time = get_sim_time('ns')
    end_time = start_time + duration_ms * 1e6  # Convert to ns

    while get_sim_time('ns') < end_time:
        # Send transaction
        packet = master.create_packet(
            addr=random.randint(0x1000, 0x8000),
            data=random.randint(0, 0xFFFFFFFF)
        )
        await master.send(packet)

        # Brief delay
        await asyncio.sleep(0.001)  # 1ms

    # Analyze performance
    stats = master.get_stats()
    master_stats = stats['master_stats']
    pipeline_stats = stats['pipeline_stats']

    print(f"Performance Analysis ({duration_ms}ms):")
    print(f"  Transactions sent: {master_stats['transactions_sent']}")
    print(f"  Success rate: {master_stats['success_rate_percent']:.1f}%")
    print(f"  Average latency: {master_stats['average_latency_ms']:.2f}ms")
    print(f"  Peak throughput: {master_stats['peak_throughput_tps']:.1f} TPS")
    print(f"  Pipeline errors: {pipeline_stats['error_count']}")

Error Handling

Timeout Handling

# Timeouts are handled automatically with proper cleanup
try:
    await master.send(packet)
except TimeoutError:
    # Pipeline automatically recovers
    print("Transaction timed out but pipeline recovered")

Signal Mapping Errors

try:
    master = GAXIMaster(dut, "Master", "", clock, field_config)
except RuntimeError as e:
    # Try manual signal mapping
    signal_map = {'valid': 'custom_valid', 'ready': 'custom_ready'}
    master = GAXIMaster(dut, "Master", "", clock, field_config,
                       signal_map=signal_map)

Memory Operation Errors

success = await master.write_to_memory(packet)
if not success:
    # Memory operation failed
    stats = master.get_stats()
    if 'memory_stats' in stats:
        print(f"Memory errors: {stats['memory_stats']['boundary_violations']}")

Best Practices

1. Enable Pipeline Debugging During Development

master = GAXIMaster(..., pipeline_debug=True)  # Development
master = GAXIMaster(..., pipeline_debug=False) # Production

2. Use Appropriate Timeout Values

# Short timeout for fast testing
master = GAXIMaster(..., timeout_cycles=100)

# Long timeout for realistic conditions  
master = GAXIMaster(..., timeout_cycles=10000)

3. Monitor Statistics Regularly

# Check statistics periodically
if transaction_count % 100 == 0:
    stats = master.get_stats()
    check_performance_requirements(stats)

4. Handle Reset Properly

# Reset before starting transactions
await master.reset_bus()

# Reset on error recovery
if error_detected:
    await master.reset_bus()

5. Use Memory Model for Complex Testing

# Create memory model for transaction validation
memory = MemoryModel(1024, 4, log=log)
master = GAXIMaster(..., memory_model=memory)

# Validate transactions through memory
await master.write_to_memory(packet)
success, data = await master.read_from_memory(packet)

The GAXIMaster provides a robust, high-performance foundation for GAXI transaction generation with comprehensive debugging, error recovery, and performance monitoring capabilities.