fork download
  1. """
  2. Demonstration of Parnas' Information Hiding Principle
  3.  
  4. Design Decision Hidden: How events/telemetry are processed and stored
  5. - The Events class hides the details of which sinks are registered
  6. - Each sink implementation hides its specific processing strategy
  7. - Changes to logging format, storage, or filtering don't affect clients
  8. """
  9.  
  10. from typing import Any, Tuple
  11. import logging
  12. from abc import ABC, abstractmethod
  13.  
  14.  
  15. class TelemetrySink(ABC):
  16. """Interface that hides implementation details of telemetry handling"""
  17.  
  18. @abstractmethod
  19. def handle(self, *args):
  20. # type: (Tuple[str, Any]) -> None
  21. """Process telemetry with key-value pairs"""
  22. pass
  23.  
  24.  
  25. class StructuredPythonLogger(TelemetrySink):
  26. """Implementation that hides Python logger integration details"""
  27.  
  28. class _KeyValueFormatter(logging.Formatter):
  29. """Internal formatter that outputs structured key=value pairs"""
  30.  
  31. def format(self, record):
  32. # type: (logging.LogRecord) -> str
  33. # The structured data is passed via extra
  34. if hasattr(record, 'structured_data'):
  35. parts = ["{0}={1}".format(k, v) for k, v in record.structured_data.items()]
  36. structured_msg = " ".join(parts)
  37. record.msg = structured_msg
  38. return super(StructuredPythonLogger._KeyValueFormatter, self).format(record)
  39.  
  40. def __init__(self, logger):
  41. # type: (logging.Logger) -> None
  42. self._logger = logger # Hidden implementation detail
  43.  
  44. def handle(self, *args):
  45. # type: (Tuple[str, Any]) -> None
  46. event_data = dict(args)
  47.  
  48. # Use log level if provided, otherwise INFO
  49. level = str(event_data.get('level', 'INFO')).upper()
  50.  
  51. # Pass structured data to logger via extra
  52. if level == 'ERROR':
  53. self._logger.error('', extra={'structured_data': event_data})
  54. elif level == 'WARNING':
  55. self._logger.warning('', extra={'structured_data': event_data})
  56. elif level == 'DEBUG':
  57. self._logger.debug('', extra={'structured_data': event_data})
  58. else:
  59. self._logger.info('', extra={'structured_data': event_data})
  60.  
  61.  
  62. class StructuredMetricsCollector(TelemetrySink):
  63. """Implementation that hides metrics aggregation strategy"""
  64.  
  65. def __init__(self):
  66. self._metrics = {} # Hidden implementation detail
  67.  
  68. def handle(self, *args):
  69. # type: (Tuple[str, Any]) -> None
  70. event_data = dict(args)
  71.  
  72. # Hidden decision: only track events with 'metric' key
  73. if 'metric' in event_data:
  74. metric_name = event_data['metric']
  75. self._metrics[metric_name] = self._metrics.get(metric_name, 0) + 1
  76.  
  77. def get_metrics(self):
  78. # type: () -> dict
  79. """Expose metrics without revealing internal storage"""
  80. return self._metrics. copy()
  81.  
  82.  
  83. class Events:
  84. """
  85. Facade that hides the complexity of multi-sink telemetry processing
  86.  
  87. Design Decisions Hidden:
  88. - Which sinks are registered
  89. - How many sinks exist
  90. - Order of sink execution
  91. - Sink lifecycle management
  92. """
  93.  
  94. def __init__(self):
  95. self._sinks = [] # type: list # Hidden detail
  96.  
  97. def register(self, sink):
  98. # type: (TelemetrySink) -> None
  99. """Add sink without exposing internal collection"""
  100. self._sinks.append(sink)
  101.  
  102. def emit(self, *args):
  103. # type: (Tuple[str, Any]) -> None
  104. """
  105. Delegate event to all sinks
  106.  
  107. Client doesn't need to know:
  108. - How many sinks exist
  109. - What each sink does
  110. - If sinks can be added/removed at runtime
  111. """
  112. for sink in self._sinks:
  113. sink.handle(*args)
  114.  
  115.  
  116. def main():
  117. """
  118. Runnable example demonstrating information hiding
  119. """
  120. # Configure logging system with custom structured formatter
  121. handler = logging.StreamHandler()
  122. handler.setLevel(logging.DEBUG)
  123. handler.setFormatter(StructuredPythonLogger._KeyValueFormatter('%(levelname)s - %(message)s'))
  124.  
  125. # Configure root logger
  126. root_logger = logging.getLogger()
  127. root_logger.setLevel(logging.DEBUG)
  128. root_logger.addHandler(handler)
  129.  
  130. # Get logger for the telemetry system
  131. logger = logging.getLogger('app.telemetry')
  132.  
  133. # Setup telemetry system (design decisions hidden)
  134. events = Events()
  135. events.register(StructuredPythonLogger(logger))
  136.  
  137. metrics = StructuredMetricsCollector()
  138. events.register(metrics)
  139.  
  140. # Use logger for demo headers
  141. demo_logger = logging.getLogger('demo')
  142. demo_logger.info("=== Telemetry Events Demo ===")
  143.  
  144. # Client code: doesn't know or care about sink implementations
  145. events.emit(
  146. ('level', 'INFO'),
  147. ('event', 'app_start'),
  148. ('user', 'd-led'),
  149. ('version', '1.0.0'),
  150. ('metric', 'app_start')
  151. )
  152.  
  153. events.emit(
  154. ('level', 'DEBUG'),
  155. ('event', 'db_query'),
  156. ('query_time_ms', 45),
  157. ('table', 'users'),
  158. ('rows', 150),
  159. ('metric', 'db_query')
  160. )
  161.  
  162. events.emit(
  163. ('level', 'ERROR'),
  164. ('event', 'connection_failed'),
  165. ('service', 'database'),
  166. ('retry_count', 3),
  167. ('error_code', 'CONN_TIMEOUT'),
  168. ('metric', 'db_error')
  169. )
  170.  
  171. events.emit(
  172. ('level', 'WARNING'),
  173. ('event', 'cache_miss'),
  174. ('key', 'user: 12345'),
  175. ('fallback', 'database'),
  176. ('metric', 'cache_miss')
  177. )
  178.  
  179. events.emit(
  180. ('level', 'INFO'),
  181. ('event', 'request_completed'),
  182. ('endpoint', '/api/users'),
  183. ('duration_ms', 234),
  184. ('status', 200),
  185. ('metric', 'request_completed')
  186. )
  187.  
  188. # More events to make metrics interesting
  189. events. emit(
  190. ('level', 'ERROR'),
  191. ('event', 'connection_failed'),
  192. ('service', 'cache'),
  193. ('retry_count', 2),
  194. ('metric', 'db_error')
  195. )
  196.  
  197. events.emit(
  198. ('level', 'DEBUG'),
  199. ('event', 'db_query'),
  200. ('query_time_ms', 23),
  201. ('table', 'products'),
  202. ('rows', 50),
  203. ('metric', 'db_query')
  204. )
  205.  
  206. events.emit(
  207. ('level', 'INFO'),
  208. ('event', 'request_completed'),
  209. ('endpoint', '/api/products'),
  210. ('duration_ms', 156),
  211. ('status', 200),
  212. ('metric', 'request_completed')
  213. )
  214.  
  215. # Show collected metrics
  216. demo_logger.info("=== Metrics Summary ===")
  217. for metric, count in metrics.get_metrics().items():
  218. demo_logger.info("{0}: {1}".format(metric, count))
  219.  
  220.  
  221. if __name__ == "__main__":
  222. main()
Success #stdin #stdout #stderr 0.04s 10696KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
INFO - === Telemetry Events Demo ===
INFO - level=INFO event=app_start user=d-led version=1.0.0 metric=app_start
DEBUG - level=DEBUG event=db_query query_time_ms=45 table=users rows=150 metric=db_query
ERROR - level=ERROR event=connection_failed service=database retry_count=3 error_code=CONN_TIMEOUT metric=db_error
WARNING - level=WARNING event=cache_miss key=user: 12345 fallback=database metric=cache_miss
INFO - level=INFO event=request_completed endpoint=/api/users duration_ms=234 status=200 metric=request_completed
ERROR - level=ERROR event=connection_failed service=cache retry_count=2 metric=db_error
DEBUG - level=DEBUG event=db_query query_time_ms=23 table=products rows=50 metric=db_query
INFO - level=INFO event=request_completed endpoint=/api/products duration_ms=156 status=200 metric=request_completed
INFO - === Metrics Summary ===
INFO - app_start:  1
INFO - db_query:  2
INFO - db_error:  2
INFO - cache_miss:  1
INFO - request_completed:  2