"""
Demonstration of Parnas' Information Hiding Principle
Design Decision Hidden: How events/telemetry are processed and stored
- The Events class hides the details of which sinks are registered
- Each sink implementation hides its specific processing strategy
- Changes to logging format, storage, or filtering don't affect clients
"""
from typing import Any, Tuple
import logging
from abc import ABC, abstractmethod
class TelemetrySink(ABC):
"""Interface that hides implementation details of telemetry handling"""
@abstractmethod
def handle(self, *args):
# type: (Tuple[str, Any]) -> None
"""Process telemetry with key-value pairs"""
pass
class StructuredPythonLogger(TelemetrySink):
"""Implementation that hides Python logger integration details"""
class _KeyValueFormatter(logging.Formatter):
"""Internal formatter that outputs structured key=value pairs"""
def format(self, record):
# type: (logging.LogRecord) -> str
# The structured data is passed via extra
if hasattr(record, 'structured_data'):
parts = ["{0}={1}".format(k, v) for k, v in record.structured_data.items()]
structured_msg = " ".join(parts)
record.msg = structured_msg
return super(StructuredPythonLogger._KeyValueFormatter, self).format(record)
def __init__(self, logger):
# type: (logging.Logger) -> None
self._logger = logger # Hidden implementation detail
def handle(self, *args):
# type: (Tuple[str, Any]) -> None
event_data = dict(args)
# Use log level if provided, otherwise INFO
level = str(event_data.get('level', 'INFO')).upper()
# Pass structured data to logger via extra
if level == 'ERROR':
self._logger.error('', extra={'structured_data': event_data})
elif level == 'WARNING':
self._logger.warning('', extra={'structured_data': event_data})
elif level == 'DEBUG':
self._logger.debug('', extra={'structured_data': event_data})
else:
self._logger.info('', extra={'structured_data': event_data})
class StructuredMetricsCollector(TelemetrySink):
"""Implementation that hides metrics aggregation strategy"""
def __init__(self):
self._metrics = {} # Hidden implementation detail
def handle(self, *args):
# type: (Tuple[str, Any]) -> None
event_data = dict(args)
# Hidden decision: only track events with 'metric' key
if 'metric' in event_data:
metric_name = event_data['metric']
self._metrics[metric_name] = self._metrics.get(metric_name, 0) + 1
def get_metrics(self):
# type: () -> dict
"""Expose metrics without revealing internal storage"""
return self._metrics. copy()
class Events:
"""
Facade that hides the complexity of multi-sink telemetry processing
Design Decisions Hidden:
- Which sinks are registered
- How many sinks exist
- Order of sink execution
- Sink lifecycle management
"""
def __init__(self):
self._sinks = [] # type: list # Hidden detail
def register(self, sink):
# type: (TelemetrySink) -> None
"""Add sink without exposing internal collection"""
self._sinks.append(sink)
def emit(self, *args):
# type: (Tuple[str, Any]) -> None
"""
Delegate event to all sinks
Client doesn't need to know:
- How many sinks exist
- What each sink does
- If sinks can be added/removed at runtime
"""
for sink in self._sinks:
sink.handle(*args)
def main():
"""
Runnable example demonstrating information hiding
"""
# Configure logging system with custom structured formatter
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
handler.setFormatter(StructuredPythonLogger._KeyValueFormatter('%(levelname)s - %(message)s'))
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
root_logger.addHandler(handler)
# Get logger for the telemetry system
logger = logging.getLogger('app.telemetry')
# Setup telemetry system (design decisions hidden)
events = Events()
events.register(StructuredPythonLogger(logger))
metrics = StructuredMetricsCollector()
events.register(metrics)
# Use logger for demo headers
demo_logger = logging.getLogger('demo')
demo_logger.info("=== Telemetry Events Demo ===")
# Client code: doesn't know or care about sink implementations
events.emit(
('level', 'INFO'),
('event', 'app_start'),
('user', 'd-led'),
('version', '1.0.0'),
('metric', 'app_start')
)
events.emit(
('level', 'DEBUG'),
('event', 'db_query'),
('query_time_ms', 45),
('table', 'users'),
('rows', 150),
('metric', 'db_query')
)
events.emit(
('level', 'ERROR'),
('event', 'connection_failed'),
('service', 'database'),
('retry_count', 3),
('error_code', 'CONN_TIMEOUT'),
('metric', 'db_error')
)
events.emit(
('level', 'WARNING'),
('event', 'cache_miss'),
('key', 'user: 12345'),
('fallback', 'database'),
('metric', 'cache_miss')
)
events.emit(
('level', 'INFO'),
('event', 'request_completed'),
('endpoint', '/api/users'),
('duration_ms', 234),
('status', 200),
('metric', 'request_completed')
)
# More events to make metrics interesting
events. emit(
('level', 'ERROR'),
('event', 'connection_failed'),
('service', 'cache'),
('retry_count', 2),
('metric', 'db_error')
)
events.emit(
('level', 'DEBUG'),
('event', 'db_query'),
('query_time_ms', 23),
('table', 'products'),
('rows', 50),
('metric', 'db_query')
)
events.emit(
('level', 'INFO'),
('event', 'request_completed'),
('endpoint', '/api/products'),
('duration_ms', 156),
('status', 200),
('metric', 'request_completed')
)
# Show collected metrics
demo_logger.info("=== Metrics Summary ===")
for metric, count in metrics.get_metrics().items():
demo_logger.info("{0}: {1}".format(metric, count))
if __name__ == "__main__":
main()
"""
Demonstration of Parnas' Information Hiding Principle

Design Decision Hidden:    How events/telemetry are processed and stored
- The Events class hides the details of which sinks are registered
- Each sink implementation hides its specific processing strategy
- Changes to logging format, storage, or filtering don't affect clients
"""

from typing import Any, Tuple
import logging
from abc import ABC, abstractmethod


class TelemetrySink(ABC):
    """Interface that hides implementation details of telemetry handling"""
    
    @abstractmethod
    def handle(self, *args):
        # type: (Tuple[str, Any]) -> None
        """Process telemetry with key-value pairs"""
        pass


class StructuredPythonLogger(TelemetrySink):
    """Implementation that hides Python logger integration details"""
    
    class _KeyValueFormatter(logging.Formatter):
        """Internal formatter that outputs structured key=value pairs"""
        
        def format(self, record):
            # type: (logging.LogRecord) -> str
            # The structured data is passed via extra
            if hasattr(record, 'structured_data'):
                parts = ["{0}={1}".format(k, v) for k, v in record.structured_data.items()]
                structured_msg = " ".join(parts)
                record.msg = structured_msg
            return super(StructuredPythonLogger._KeyValueFormatter, self).format(record)
    
    def __init__(self, logger):
        # type: (logging.Logger) -> None
        self._logger = logger  # Hidden implementation detail
    
    def handle(self, *args):
        # type: (Tuple[str, Any]) -> None
        event_data = dict(args)
        
        # Use log level if provided, otherwise INFO
        level = str(event_data.get('level', 'INFO')).upper()
        
        # Pass structured data to logger via extra
        if level == 'ERROR':
            self._logger.error('', extra={'structured_data': event_data})
        elif level == 'WARNING':
            self._logger.warning('', extra={'structured_data':  event_data})
        elif level == 'DEBUG':
            self._logger.debug('', extra={'structured_data':  event_data})
        else:
            self._logger.info('', extra={'structured_data': event_data})


class StructuredMetricsCollector(TelemetrySink):
    """Implementation that hides metrics aggregation strategy"""
    
    def __init__(self):
        self._metrics = {}  # Hidden implementation detail
    
    def handle(self, *args):
        # type: (Tuple[str, Any]) -> None
        event_data = dict(args)
        
        # Hidden decision: only track events with 'metric' key
        if 'metric' in event_data:
            metric_name = event_data['metric']
            self._metrics[metric_name] = self._metrics.get(metric_name, 0) + 1
    
    def get_metrics(self):
        # type: () -> dict
        """Expose metrics without revealing internal storage"""
        return self._metrics. copy()


class Events:
    """
    Facade that hides the complexity of multi-sink telemetry processing
    
    Design Decisions Hidden:
    - Which sinks are registered
    - How many sinks exist
    - Order of sink execution
    - Sink lifecycle management
    """
    
    def __init__(self):
        self._sinks = []  # type: list  # Hidden detail
    
    def register(self, sink):
        # type: (TelemetrySink) -> None
        """Add sink without exposing internal collection"""
        self._sinks.append(sink)
    
    def emit(self, *args):
        # type: (Tuple[str, Any]) -> None
        """
        Delegate event to all sinks
        
        Client doesn't need to know: 
        - How many sinks exist
        - What each sink does
        - If sinks can be added/removed at runtime
        """
        for sink in self._sinks:
            sink.handle(*args)


def main():
    """
    Runnable example demonstrating information hiding
    """
    # Configure logging system with custom structured formatter
    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)
    handler.setFormatter(StructuredPythonLogger._KeyValueFormatter('%(levelname)s - %(message)s'))
    
    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.DEBUG)
    root_logger.addHandler(handler)
    
    # Get logger for the telemetry system
    logger = logging.getLogger('app.telemetry')
    
    # Setup telemetry system (design decisions hidden)
    events = Events()
    events.register(StructuredPythonLogger(logger))
    
    metrics = StructuredMetricsCollector()
    events.register(metrics)
    
    # Use logger for demo headers
    demo_logger = logging.getLogger('demo')
    demo_logger.info("=== Telemetry Events Demo ===")
    
    # Client code:  doesn't know or care about sink implementations
    events.emit(
        ('level', 'INFO'),
        ('event', 'app_start'),
        ('user', 'd-led'),
        ('version', '1.0.0'),
        ('metric', 'app_start')
    )
    
    events.emit(
        ('level', 'DEBUG'),
        ('event', 'db_query'),
        ('query_time_ms', 45),
        ('table', 'users'),
        ('rows', 150),
        ('metric', 'db_query')
    )
    
    events.emit(
        ('level', 'ERROR'),
        ('event', 'connection_failed'),
        ('service', 'database'),
        ('retry_count', 3),
        ('error_code', 'CONN_TIMEOUT'),
        ('metric', 'db_error')
    )
    
    events.emit(
        ('level', 'WARNING'),
        ('event', 'cache_miss'),
        ('key', 'user: 12345'),
        ('fallback', 'database'),
        ('metric', 'cache_miss')
    )
    
    events.emit(
        ('level', 'INFO'),
        ('event', 'request_completed'),
        ('endpoint', '/api/users'),
        ('duration_ms', 234),
        ('status', 200),
        ('metric', 'request_completed')
    )
    
    # More events to make metrics interesting
    events. emit(
        ('level', 'ERROR'),
        ('event', 'connection_failed'),
        ('service', 'cache'),
        ('retry_count', 2),
        ('metric', 'db_error')
    )
    
    events.emit(
        ('level', 'DEBUG'),
        ('event', 'db_query'),
        ('query_time_ms', 23),
        ('table', 'products'),
        ('rows', 50),
        ('metric', 'db_query')
    )
    
    events.emit(
        ('level', 'INFO'),
        ('event', 'request_completed'),
        ('endpoint', '/api/products'),
        ('duration_ms', 156),
        ('status', 200),
        ('metric', 'request_completed')
    )
    
    # Show collected metrics
    demo_logger.info("=== Metrics Summary ===")
    for metric, count in metrics.get_metrics().items():
        demo_logger.info("{0}:  {1}".format(metric, count))


if __name__ == "__main__":
    main()