|
CocoTB Framework · Verification Infrastructure for RTL Testing GitHub · Documentation Index · MIT License |
fifo_slave.py¶
FIFO Slave component for reading transactions from FIFO interfaces. Inherits from FIFOMonitorBase for unified functionality while preserving exact API and timing behavior.
Overview¶
The FIFOSlave class combines monitoring capabilities with active read signal control to consume transactions from FIFO interfaces. It observes transactions like a monitor while actively driving read signals to control data flow, making it ideal for modeling FIFO consumers.
Key Features¶
- Active Read Control: Drives read signals with configurable timing and delays
- Transaction Monitoring: Observes and processes incoming transactions
- Flow Control: Handles FIFO empty conditions with backpressure support
- Memory Integration: Built-in memory model support for data storage
- Protocol Violation Detection: Detects read-while-empty and other issues
- Flexible Timing: Configurable read delays with randomization support
Core Class¶
FIFOSlave¶
FIFO Slave component for reading transactions from FIFO interfaces.
Constructor¶
FIFOSlave(dut, title, prefix, clock, field_config,
timeout_cycles=1000, mode='fifo_mux',
bus_name='',
pkt_prefix='',
multi_sig=False,
randomizer=None, log=None, super_debug=False,
signal_map=None, **kwargs)
Parameters:
- dut: Device under test
- title: Component title/name
- prefix: Bus prefix for signal naming
- clock: Clock signal
- field_config: Field configuration (FieldConfig object or dict)
- timeout_cycles: Timeout for operations (default: 1000)
- mode: FIFO mode ('fifo_mux', 'fifo_flop')
- bus_name: Bus/channel name
- pkt_prefix: Packet field prefix
- multi_sig: Whether using multi-signal mode
- randomizer: Optional randomizer for read delays
- log: Logger instance
- super_debug: Enable detailed debugging
- signal_map: Optional manual signal mapping
- **kwargs: Additional arguments for BusMonitor
Example:
# Basic FIFO slave
slave = FIFOSlave(
dut=dut,
title="ReadSlave",
prefix="",
clock=clock,
field_config=field_config
)
# Advanced configuration
slave = FIFOSlave(
dut=dut,
title="AdvancedSlave",
prefix="",
clock=clock,
field_config=field_config,
timeout_cycles=2000,
mode='fifo_flop',
multi_sig=True,
randomizer=custom_randomizer,
signal_map={'read': 'rd_en', 'empty': 'fifo_empty'}
)
Core Methods¶
Bus Control¶
async reset_bus()¶
Reset the bus and clear all queued transactions.
async wait_cycles(cycles)¶
Wait for specified number of clock cycles.
Parameters:
- cycles: Number of cycles to wait
Transaction Access¶
get_observed_packets(count=None)¶
Get observed transactions from the standard cocotb receive queue.
Parameters:
- count: Number of packets to return (None = all)
Returns: List of observed FIFOPacket instances
# Get all observed transactions
all_packets = slave.get_observed_packets()
# Get last 5 transactions
recent_packets = slave.get_observed_packets(count=5)
# Process observed transactions
for packet in all_packets:
print(f"Received: {packet.formatted()}")
clear_queue()¶
Clear the observed transactions queue.
create_packet(**field_values)¶
Create a packet with specified field values.
Parameters:
- **field_values: Field values to set in packet
Returns: FIFOPacket instance
Memory Operations¶
handle_memory_write(packet)¶
Handle memory write using integrated memory model.
Parameters:
- packet: Packet to write to memory
Returns: True if successful, False otherwise
# Automatically called during transaction processing
# Can also be called manually
success = slave.handle_memory_write(packet)
handle_memory_read(packet)¶
Handle memory read using integrated memory model.
Parameters:
- packet: Packet with address to read from
Returns: Tuple of (success, data)
# Read from memory
success, data = slave.handle_memory_read(packet)
if success:
log.info(f"Read data: 0x{data:X}")
Statistics and Monitoring¶
get_stats()¶
Get comprehensive statistics including base and monitoring statistics.
Returns: Dictionary containing all statistics
stats = slave.get_stats()
# Monitor statistics
monitor_stats = stats['monitor_stats']
print(f"Transactions received: {monitor_stats['received_transactions']}")
print(f"Transactions observed: {monitor_stats['transactions_observed']}")
print(f"Protocol violations: {monitor_stats['protocol_violations']}")
print(f"Read while empty: {monitor_stats['read_while_empty']}")
# Component statistics
print(f"Observed packets: {stats['observed_packets']}")
print(f"Component type: {stats['component_type']}")
Operational Modes¶
FIFO_MUX Mode¶
In fifo_mux mode, data is captured in the same cycle as the read:
slave = FIFOSlave(dut, "MuxSlave", "", clock, field_config, mode='fifo_mux')
# Data captured immediately when read=1 and empty=0
# Timeline: read=1 -> data captured same cycle
FIFO_FLOP Mode¶
In fifo_flop mode, data is captured in the cycle following the read:
slave = FIFOSlave(dut, "FlopSlave", "", clock, field_config, mode='fifo_flop')
# Data captured one cycle after read assertion
# Timeline: read=1 -> wait 1 cycle -> data captured
Usage Patterns¶
Basic Read Operations¶
# Set up slave
field_config = FieldConfig.create_data_only(32)
slave = FIFOSlave(dut, "Slave", "", clock, field_config)
# Start monitoring (automatic in background)
# Slave automatically drives read signals and captures data
# Access observed transactions
await Timer(1000, 'ns') # Let some transactions occur
packets = slave.get_observed_packets()
for packet in packets:
print(f"Received data: 0x{packet.data:X}")
Multi-Field Transaction Processing¶
# Configure multi-field packets
field_config = FieldConfig()
field_config.add_field(FieldDefinition("addr", 32, format="hex"))
field_config.add_field(FieldDefinition("data", 32, format="hex"))
field_config.add_field(FieldDefinition("cmd", 4, format="hex"))
# Create slave with multi-signal mode
slave = FIFOSlave(
dut=dut,
title="MultiFieldSlave",
prefix="",
clock=clock,
field_config=field_config,
multi_sig=True
)
# Process complex transactions
await Timer(1000, 'ns')
packets = slave.get_observed_packets()
for packet in packets:
print(f"Command: {packet.cmd}, Address: 0x{packet.addr:X}, Data: 0x{packet.data:X}")
# Process based on command
if packet.cmd == 1: # READ
print("Processing read command")
elif packet.cmd == 2: # WRITE
print("Processing write command")
Randomized Read Timing¶
# Create randomizer for read delays
read_randomizer = FlexRandomizer({
'read_delay': ([(0, 1), (2, 8), (9, 30)], [5, 2, 1])
})
# Create slave with randomized timing
slave = FIFOSlave(
dut=dut,
title="RandomSlave",
prefix="",
clock=clock,
field_config=field_config,
randomizer=read_randomizer
)
# Reads will have randomized delays between transactions
Memory-Integrated Processing¶
# Set up memory model
memory = MemoryModel(num_lines=256, bytes_per_line=4)
# Create slave with memory integration
slave = FIFOSlave(
dut=dut,
title="MemorySlave",
prefix="",
clock=clock,
field_config=field_config,
memory_model=memory
)
# Transactions automatically stored in memory
await Timer(2000, 'ns')
# Verify memory contents
packets = slave.get_observed_packets()
for packet in packets:
if hasattr(packet, 'addr') and hasattr(packet, 'data'):
# Verify memory write occurred
success, stored_data = slave.handle_memory_read(packet)
assert success and stored_data == packet.data
Real-Time Transaction Processing¶
class ProcessingSlave:
def __init__(self, dut, clock, field_config):
self.slave = FIFOSlave(dut, "ProcessingSlave", "", clock, field_config)
self.processed_count = 0
# Add callback for real-time processing
self.slave.add_callback(self.process_transaction)
def process_transaction(self, packet):
"""Process transactions as they arrive"""
self.processed_count += 1
# Real-time processing
if hasattr(packet, 'cmd'):
if packet.cmd == 1: # READ command
self.handle_read_request(packet)
elif packet.cmd == 2: # WRITE command
self.handle_write_request(packet)
log.info(f"Processed transaction {self.processed_count}: {packet.formatted()}")
def handle_read_request(self, packet):
"""Handle read request"""
if hasattr(packet, 'addr'):
log.info(f"Read request for address 0x{packet.addr:X}")
# Process read...
def handle_write_request(self, packet):
"""Handle write request"""
if hasattr(packet, 'addr') and hasattr(packet, 'data'):
log.info(f"Write request: 0x{packet.data:X} -> 0x{packet.addr:X}")
# Process write...
# Usage
processor = ProcessingSlave(dut, clock, field_config)
# Transactions processed automatically as they arrive
Error Detection and Monitoring¶
class MonitoringSlave:
def __init__(self, dut, clock, field_config):
self.slave = FIFOSlave(
dut=dut,
title="MonitoringSlave",
prefix="",
clock=clock,
field_config=field_config
)
self.error_log = []
async def monitor_with_error_detection(self, duration_ns=10000):
"""Monitor for specified duration with error checking"""
start_time = cocotb.utils.get_sim_time('ns')
while cocotb.utils.get_sim_time('ns') - start_time < duration_ns:
await Timer(100, 'ns')
# Check for protocol violations
stats = self.slave.get_stats()
monitor_stats = stats['monitor_stats']
if monitor_stats['protocol_violations'] > 0:
self.error_log.append({
'type': 'protocol_violation',
'count': monitor_stats['protocol_violations'],
'time': cocotb.utils.get_sim_time('ns')
})
if monitor_stats['read_while_empty'] > 0:
self.error_log.append({
'type': 'read_while_empty',
'count': monitor_stats['read_while_empty'],
'time': cocotb.utils.get_sim_time('ns')
})
return self.error_log
def generate_error_report(self):
"""Generate comprehensive error report"""
stats = self.slave.get_stats()
report = f"""
Slave Error Report:
==================
Total Transactions: {stats['monitor_stats']['transactions_observed']}
Protocol Violations: {stats['monitor_stats']['protocol_violations']}
Read While Empty: {stats['monitor_stats']['read_while_empty']}
X/Z Violations: {stats['monitor_stats']['x_z_violations']}
Error Timeline:
"""
for error in self.error_log:
report += f"\n {error['time']}ns: {error['type']} (count: {error['count']})"
return report
# Usage
monitor = MonitoringSlave(dut, clock, field_config)
await monitor.monitor_with_error_detection(duration_ns=50000)
print(monitor.generate_error_report())
High-Performance Data Collection¶
class HighPerformanceSlave:
def __init__(self, dut, clock, field_config):
# Configure for maximum performance
self.slave = FIFOSlave(
dut=dut,
title="HighPerfSlave",
prefix="",
clock=clock,
field_config=field_config,
super_debug=False # Disable debug for performance
)
self.collection_buffer = []
async def fast_collection(self, target_count=1000):
"""High-speed data collection"""
collected = 0
start_time = time.time()
while collected < target_count:
await Timer(10, 'ns') # Minimal wait
# Check for new transactions
current_packets = self.slave.get_observed_packets()
if len(current_packets) > collected:
# New transactions available
new_packets = current_packets[collected:]
self.collection_buffer.extend(new_packets)
collected = len(current_packets)
end_time = time.time()
duration = end_time - start_time
rate = target_count / duration
log.info(f"Collected {target_count} transactions in {duration:.2f}s ({rate:.1f} TPS)")
return self.collection_buffer
def analyze_collected_data(self):
"""Analyze collected transaction data"""
if not self.collection_buffer:
return {}
data_values = [p.data for p in self.collection_buffer if hasattr(p, 'data')]
analysis = {
'count': len(self.collection_buffer),
'unique_data_values': len(set(data_values)),
'data_range': (min(data_values), max(data_values)) if data_values else (0, 0),
'average_data': sum(data_values) / len(data_values) if data_values else 0
}
return analysis
# Usage
collector = HighPerformanceSlave(dut, clock, field_config)
await collector.fast_collection(target_count=5000)
analysis = collector.analyze_collected_data()
print(f"Collection analysis: {analysis}")
Custom Signal Mapping¶
# For non-standard FIFO interfaces
def create_custom_slave(dut, clock, custom_signals):
"""Create slave with custom signal naming"""
# Map custom signals to expected names
signal_map = {
'read': custom_signals.get('read_enable', 'read'),
'empty': custom_signals.get('empty_flag', 'empty'),
'data': custom_signals.get('read_data', 'data')
}
# Create field configuration
field_config = FieldConfig.create_data_only(32)
# Create slave with custom mapping
slave = FIFOSlave(
dut=dut,
title="CustomSlave",
prefix="",
clock=clock,
field_config=field_config,
signal_map=signal_map,
super_debug=True # Enable for signal mapping debugging
)
return slave
# Usage with custom interface
custom_signals = {
'read_enable': 'rd_en',
'empty_flag': 'fifo_empty',
'read_data': 'dout'
}
slave = create_custom_slave(dut, clock, custom_signals)
Protocol Violation Detection¶
The FIFOSlave automatically detects and reports various protocol violations:
Read While Empty¶
# Automatically detected and logged
stats = slave.get_stats()
read_while_empty_count = stats['monitor_stats']['read_while_empty']
if read_while_empty_count > 0:
log.warning(f"Detected {read_while_empty_count} read-while-empty violations")
X/Z Signal Values¶
# X/Z values in signals automatically detected
stats = slave.get_stats()
xz_violations = stats['monitor_stats']['x_z_violations']
if xz_violations > 0:
log.warning(f"Detected {xz_violations} X/Z signal violations")
Timing Analysis¶
Read Timing Patterns¶
class TimingAnalyzer:
def __init__(self, slave):
self.slave = slave
self.timing_data = []
async def analyze_read_timing(self, duration_ns=10000):
"""Analyze read signal timing patterns"""
start_time = cocotb.utils.get_sim_time('ns')
last_packet_count = 0
while cocotb.utils.get_sim_time('ns') - start_time < duration_ns:
current_time = cocotb.utils.get_sim_time('ns')
current_packets = len(self.slave.get_observed_packets())
if current_packets > last_packet_count:
# New transaction detected
self.timing_data.append({
'time': current_time,
'transaction_count': current_packets
})
last_packet_count = current_packets
await Timer(50, 'ns')
return self.analyze_intervals()
def analyze_intervals(self):
"""Analyze time intervals between transactions"""
if len(self.timing_data) < 2:
return {}
intervals = []
for i in range(1, len(self.timing_data)):
interval = self.timing_data[i]['time'] - self.timing_data[i-1]['time']
intervals.append(interval)
return {
'total_transactions': len(self.timing_data),
'average_interval_ns': sum(intervals) / len(intervals),
'min_interval_ns': min(intervals),
'max_interval_ns': max(intervals),
'intervals': intervals
}
# Usage
analyzer = TimingAnalyzer(slave)
timing_analysis = await analyzer.analyze_read_timing(duration_ns=20000)
print(f"Timing analysis: {timing_analysis}")
Best Practices¶
1. Use Appropriate Mode for Interface¶
# For combinatorial FIFO outputs
slave = FIFOSlave(dut, "CombSlave", "", clock, field_config, mode='fifo_mux')
# For registered FIFO outputs
slave = FIFOSlave(dut, "RegSlave", "", clock, field_config, mode='fifo_flop')
2. Monitor Error Conditions¶
# Regular error checking
async def check_slave_health(slave):
stats = slave.get_stats()
monitor_stats = stats['monitor_stats']
assert monitor_stats['protocol_violations'] == 0, "Protocol violations detected"
assert monitor_stats['x_z_violations'] == 0, "X/Z signal violations detected"
3. Use Memory Integration for Verification¶
# Enable memory model for data verification
memory = MemoryModel(num_lines=1024, bytes_per_line=4)
slave = FIFOSlave(dut, "VerifySlave", "", clock, field_config, memory_model=memory)
4. Configure Appropriate Randomizers¶
# For realistic read patterns
realistic_randomizer = FlexRandomizer({
'read_delay': ([(0, 1), (2, 5)], [8, 2]) # Mostly quick reads
})
# For stress testing
stress_randomizer = FlexRandomizer({
'read_delay': ([(0, 10), (20, 100)], [1, 1]) # Variable delays
})
5. Clear Queues Between Test Phases¶
The FIFOSlave provides comprehensive transaction monitoring and read control capabilities with automatic error detection, performance tracking, and flexible timing control for thorough FIFO interface verification.