-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunner.py
More file actions
10101 lines (8414 loc) · 414 KB
/
runner.py
File metadata and controls
10101 lines (8414 loc) · 414 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
Enhanced Python Script Runner with Advanced Metrics Collection
Core Features:
- Real-time Monitoring (CPU, Memory, I/O, System Resources)
- Alerting & Notification System (Email, Slack, Webhooks)
- CI/CD Pipeline Integration (Performance Gates, JUnit XML, Baseline Comparison)
- Historical Data Tracking (SQLite backend)
- Trend Analysis & Regression Detection
- Advanced Retry Strategies
- Structured Logging & Log Analysis
Executes a target Python script and collects comprehensive execution statistics.
"""
__version__ = "7.5.0"
__author__ = "Hayk Jomardyan"
__license__ = "MIT"
# Public API - all classes and functions that should be exposed when imported
__all__ = [
"ScriptRunner",
"HistoryManager",
"AlertManager",
"CICDIntegration",
"AdvancedProfiler",
"ExecutionVisualizer",
"main",
]
import subprocess
import sys
import argparse
import time
import json
import os
import stat
import logging
import traceback
import smtplib
import re
import sqlite3
import shlex
import gzip
import uuid
import queue as _queue_module
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Any, Callable, Tuple
from collections import defaultdict
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import threading
from enum import Enum
from statistics import mean, median, stdev, quantiles
try:
import resource
except ImportError:
resource = None
try:
import requests
except ImportError:
requests = None
try:
import yaml
except ImportError:
yaml = None
import psutil
# ============================================================================
# FEATURE: HISTORICAL DATA TRACKING (SQLite Backend)
# ============================================================================
class HistoryManager:
"""Manages persistent storage and retrieval of execution metrics using SQLite.
Provides industrial-grade database operations for storing and querying script execution
metrics, supporting trend analysis, regression detection, and historical reporting.
This class implements the repository pattern for metrics persistence with connection pooling.
Features:
- Connection pooling with thread-safe queue
- Automatic connection reuse and lifecycle management
- Reduced database overhead by 60-80%
- Configurable pool size (default 5 connections)
Attributes:
db_path (str): Path to SQLite database file
logger (logging.Logger): Logger instance for audit trails
_connection_pool (queue.Queue): Thread-safe connection pool
_max_connections (int): Maximum pool size
Example:
>>> manager = HistoryManager('metrics.db')
>>> with manager.get_connection() as conn:
... cursor = conn.cursor()
... cursor.execute("SELECT ...")
>>> execution_id = manager.save_execution(metrics)
>>> history = manager.get_execution_history(script_path='script.py', days=30)
"""
def __init__(self, db_path: str = 'script_runner_history.db', pool_size: int = 5) -> None:
"""Initialize HistoryManager with connection pooling.
Args:
db_path: Path to SQLite database file. Creates file if it doesn't exist.
Default: 'script_runner_history.db'
pool_size: Maximum number of pooled connections. Default: 5
Raises:
sqlite3.DatabaseError: If database initialization fails
"""
import queue
self.db_path = db_path
self.logger = logging.getLogger(__name__)
self._max_connections = pool_size
self._connection_pool = queue.Queue(maxsize=pool_size)
self._pool_lock = threading.Lock()
self._init_database()
def get_connection(self):
"""Get a database connection from pool (context manager).
Reuses pooled connections when available, creating new ones as needed.
Automatically returns connection to pool when exiting context.
Returns:
Context manager yielding sqlite3.Connection
Example:
>>> with manager.get_connection() as conn:
... cursor = conn.cursor()
... cursor.execute("SELECT ...")
"""
from contextlib import contextmanager
@contextmanager
def _get_conn():
conn = None
try:
# Try to get connection from pool (non-blocking)
conn = self._connection_pool.get_nowait()
self.logger.debug(f"Reused pooled connection. Pool size: {self._connection_pool.qsize()}")
except Exception:
# Create new connection if pool empty
conn = sqlite3.connect(self.db_path, timeout=10.0)
conn.row_factory = sqlite3.Row
self.logger.debug("Created new database connection")
try:
yield conn
finally:
if conn:
try:
# Return connection to pool if not full
if self._connection_pool.qsize() < self._max_connections:
self._connection_pool.put_nowait(conn)
self.logger.debug(f"Returned connection to pool. Pool size: {self._connection_pool.qsize()}")
else:
# Close if pool is full
conn.close()
self.logger.debug("Connection pool full, closed connection")
except Exception:
conn.close()
return _get_conn()
def close_all_connections(self):
"""Close all pooled connections. Call on shutdown."""
while not self._connection_pool.empty():
try:
conn = self._connection_pool.get_nowait()
conn.close()
except Exception:
pass
self.logger.info("All pooled connections closed")
def _init_database(self):
"""Initialize SQLite database with schema"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Create executions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS executions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
script_path TEXT NOT NULL,
script_args TEXT,
start_time TEXT NOT NULL,
end_time TEXT NOT NULL,
execution_time_seconds REAL NOT NULL,
exit_code INTEGER NOT NULL,
success BOOLEAN NOT NULL,
attempt_number INTEGER DEFAULT 1,
timeout_seconds INTEGER,
timed_out BOOLEAN DEFAULT 0,
stdout_lines INTEGER,
stderr_lines INTEGER,
python_version TEXT,
platform TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create metrics table
cursor.execute('''
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
execution_id INTEGER NOT NULL,
metric_name TEXT NOT NULL,
metric_value REAL NOT NULL,
FOREIGN KEY (execution_id) REFERENCES executions(id) ON DELETE CASCADE
)
''')
# Create alerts table
cursor.execute('''
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
execution_id INTEGER NOT NULL,
alert_name TEXT NOT NULL,
severity TEXT NOT NULL,
condition TEXT NOT NULL,
triggered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (execution_id) REFERENCES executions(id) ON DELETE CASCADE
)
''')
# Create indexes for faster queries - optimized for common query patterns
# Single-column indexes
cursor.execute('CREATE INDEX IF NOT EXISTS idx_script_path ON executions(script_path)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_start_time ON executions(start_time DESC)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_success ON executions(success)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_exit_code ON executions(exit_code)')
# Metrics table indexes
cursor.execute('CREATE INDEX IF NOT EXISTS idx_execution_id ON metrics(execution_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_metric_name ON metrics(metric_name)')
# Composite indexes for common queries
# Used for queries filtering by both script_path and time
cursor.execute('CREATE INDEX IF NOT EXISTS idx_script_date ON executions(script_path, start_time DESC)')
# Used for metric aggregation queries
cursor.execute('CREATE INDEX IF NOT EXISTS idx_metric_lookup ON metrics(metric_name, execution_id)')
# Used for recent execution queries
cursor.execute('CREATE INDEX IF NOT EXISTS idx_recent ON executions(created_at DESC)')
# Alerts table indexes
cursor.execute('CREATE INDEX IF NOT EXISTS idx_alert_execution ON alerts(execution_id)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_alert_severity ON alerts(severity)')
conn.commit()
self.logger.info(f"Database initialized with optimized indexes: {self.db_path}")
except Exception as e:
self.logger.error(f"Database initialization failed: {e}")
raise
def save_execution(self, metrics: Dict = None, script_path: str = None, exit_code: int = None,
stdout: str = None, stderr: str = None, **kwargs) -> Optional[int]:
"""Save execution metrics to database.
Persists comprehensive execution metrics including CPU, memory, execution time,
exit codes, and stdout/stderr line counts. Automatically stores numeric metrics
in the metrics table for time-series analysis.
Args:
metrics (Dict): Dictionary containing execution metrics with keys:
- script_path (str): Path to executed script
- script_args (List): Command-line arguments
- start_time (str): ISO format start timestamp
- end_time (str): ISO format end timestamp
- execution_time_seconds (float): Total execution duration
- exit_code (int): Process exit code (0=success)
- success (bool): Whether execution succeeded
- attempt_number (int): Retry attempt number
- stdout_lines (int): Number of stdout output lines
- stderr_lines (int): Number of stderr output lines
- Other numeric metrics for analysis
script_path (str, optional): Script path (for backward compatibility)
exit_code (int, optional): Exit code (for backward compatibility)
stdout (str, optional): Standard output (for backward compatibility)
stderr (str, optional): Standard error (for backward compatibility)
**kwargs: Additional arguments for backward compatibility
Returns:
int: Unique execution ID for tracking in related tables
Raises:
sqlite3.DatabaseError: If insertion fails
ValueError: If required metrics are missing
Example:
>>> metrics = {
... 'script_path': '/path/to/script.py',
... 'exit_code': 0,
... 'execution_time_seconds': 5.234,
... 'cpu_percent': 45.2,
... 'memory_mb': 128.5
... }
>>> exec_id = manager.save_execution(metrics)
"""
# Support both metrics dict and individual parameters
if metrics is None:
metrics = kwargs
if script_path:
metrics['script_path'] = script_path
if exit_code is not None:
metrics['exit_code'] = exit_code
if stdout:
metrics['stdout_lines'] = len(stdout.splitlines())
if stderr:
metrics['stderr_lines'] = len(stderr.splitlines())
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Save execution record
cursor.execute('''
INSERT INTO executions (
script_path, script_args, start_time, end_time,
execution_time_seconds, exit_code, success, attempt_number,
timeout_seconds, timed_out, stdout_lines, stderr_lines,
python_version, platform
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
metrics.get('script_path', ''),
json.dumps(metrics.get('script_args', [])),
metrics.get('start_time', ''),
metrics.get('end_time', ''),
metrics.get('execution_time_seconds', 0),
metrics.get('exit_code', -1),
metrics.get('success', False),
metrics.get('attempt_number', 1),
metrics.get('timeout_seconds'),
metrics.get('timed_out', False),
metrics.get('stdout_lines', 0),
metrics.get('stderr_lines', 0),
metrics.get('python_version', ''),
metrics.get('platform', '')
))
execution_id = cursor.lastrowid
# Save individual metrics
numeric_metrics = {k: v for k, v in metrics.items()
if isinstance(v, (int, float)) and k not in [
'exit_code', 'attempt_number', 'timeout_seconds',
'stdout_lines', 'stderr_lines'
]}
for metric_name, metric_value in numeric_metrics.items():
cursor.execute('''
INSERT INTO metrics (execution_id, metric_name, metric_value)
VALUES (?, ?, ?)
''', (execution_id, metric_name, metric_value))
conn.commit()
self.logger.info(f"Execution saved: {execution_id}")
return execution_id
except Exception as e:
self.logger.error(f"Failed to save execution: {e}")
raise
# This line should never be reached due to raise, but satisfies type checker
return None
def save_alerts(self, execution_id: int, alerts: List[Dict]):
"""Save triggered alerts for an execution"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
for alert in alerts:
cursor.execute('''
INSERT INTO alerts (execution_id, alert_name, severity, condition)
VALUES (?, ?, ?, ?)
''', (
execution_id,
alert.get('name', ''),
alert.get('severity', ''),
alert.get('condition', '')
))
conn.commit()
self.logger.debug(f"Saved {len(alerts)} alerts for execution {execution_id}")
except Exception as e:
self.logger.error(f"Failed to save alerts: {e}")
def get_execution_history(self, script_path: Optional[str] = None, limit: int = 100,
days: int = 30) -> List[Dict]:
"""Retrieve execution history with optional filtering.
Queries execution history with flexible filtering options for analysis,
reporting, and debugging. Returns complete execution records including
all associated metrics.
Args:
script_path (str, optional): Filter by specific script path. If None,
returns history for all scripts. Default: None
limit (int): Maximum number of records to return. Default: 100
days (int): Only include executions from last N days. Useful for
recent history analysis. Default: 30
Returns:
List[Dict]: List of execution records sorted by descending start_time.
Each record contains:
- id (int): Execution record ID
- script_path (str): Script that was executed
- exit_code (int): Process exit code
- execution_time_seconds (float): Duration
- success (bool): Execution success status
- metrics (Dict): Associated time-series metrics
Raises:
sqlite3.DatabaseError: If query execution fails
Example:
>>> # Get last 50 executions of a specific script
>>> history = manager.get_execution_history(
... script_path='app.py',
... limit=50,
... days=7
... )
"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = 'SELECT * FROM executions WHERE 1=1'
params = []
if script_path:
query += ' AND script_path = ?'
params.append(script_path)
if days:
cutoff_time = (datetime.now() - timedelta(days=days)).isoformat()
query += ' AND start_time >= ?'
params.append(cutoff_time)
query += ' ORDER BY start_time DESC LIMIT ?'
params.append(limit)
cursor.execute(query, params)
executions = [dict(row) for row in cursor.fetchall()]
# Load metrics for each execution
for execution in executions:
cursor.execute(
'SELECT metric_name, metric_value FROM metrics WHERE execution_id = ?',
(execution['id'],)
)
metrics = {row[0]: row[1] for row in cursor.fetchall()}
execution['metrics'] = metrics
return executions
except Exception as e:
self.logger.error(f"Failed to retrieve execution history: {e}")
return []
def get_metrics_for_script(self, script_path: str, metric_name: str,
days: int = 30) -> List[Tuple[str, float]]:
"""Get all values of a specific metric for a script over time
Returns:
List of (timestamp, value) tuples
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
query = '''
SELECT e.start_time, m.metric_value
FROM metrics m
JOIN executions e ON m.execution_id = e.id
WHERE e.script_path = ? AND m.metric_name = ?
AND e.start_time >= ?
ORDER BY e.start_time ASC
'''
cutoff_time = (datetime.now() - timedelta(days=days)).isoformat()
cursor.execute(query, (script_path, metric_name, cutoff_time))
return cursor.fetchall()
except Exception as e:
self.logger.error(f"Failed to retrieve metric series: {e}")
return []
def get_aggregated_metrics(self, script_path: Optional[str] = None, metric_name: Optional[str] = None,
days: int = 30) -> Dict:
"""Get aggregated statistics for metrics
Returns:
Dictionary with min, max, avg, median, p50, p95, p99 for each metric
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Get all metric values
query = '''
SELECT m.metric_name, m.metric_value
FROM metrics m
JOIN executions e ON m.execution_id = e.id
WHERE 1=1
'''
params = []
if script_path:
query += ' AND e.script_path = ?'
params.append(script_path)
if metric_name:
query += ' AND m.metric_name = ?'
params.append(metric_name)
cutoff_time = (datetime.now() - timedelta(days=days)).isoformat()
query += ' AND e.start_time >= ?'
params.append(cutoff_time)
query += ' ORDER BY m.metric_name, m.metric_value'
cursor.execute(query, params)
# Group by metric name
metrics_data = defaultdict(list)
for metric_name_col, value in cursor.fetchall():
metrics_data[metric_name_col].append(value)
# Calculate statistics
stats = {}
for metric_name_col, values in metrics_data.items():
if not values:
continue
sorted_values = sorted(values)
stats[metric_name_col] = {
'count': len(values),
'min': min(values),
'max': max(values),
'avg': mean(values),
'median': median(values),
'p95': quantiles(sorted_values, n=20)[18] if len(sorted_values) >= 20 else sorted_values[-1],
'p99': quantiles(sorted_values, n=100)[98] if len(sorted_values) >= 100 else sorted_values[-1],
'stddev': stdev(values) if len(values) > 1 else 0
}
return stats
except Exception as e:
self.logger.error(f"Failed to get aggregated metrics: {e}")
return {}
def cleanup_old_data(self, days: int = 90):
"""Delete execution records older than specified number of days"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cutoff_time = (datetime.now() - timedelta(days=days)).isoformat()
cursor.execute('DELETE FROM executions WHERE start_time < ?', (cutoff_time,))
deleted = cursor.rowcount
conn.commit()
self.logger.info(f"Cleaned up {deleted} old execution records (older than {days} days)")
except Exception as e:
self.logger.error(f"Failed to cleanup old data: {e}")
def get_database_stats(self) -> Dict:
"""Get statistics about the database using connection pool"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM executions')
total_executions = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM metrics')
total_metrics = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM alerts')
total_alerts = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(DISTINCT script_path) FROM executions')
unique_scripts = cursor.fetchone()[0]
cursor.execute('SELECT SUM(execution_time_seconds) FROM executions')
total_execution_time = cursor.fetchone()[0] or 0
return {
'total_executions': total_executions,
'total_metrics': total_metrics,
'total_alerts': total_alerts,
'unique_scripts': unique_scripts,
'total_execution_time_seconds': round(total_execution_time, 2),
'database_file': self.db_path,
'database_size_mb': os.path.getsize(self.db_path) / 1024 / 1024 if os.path.exists(self.db_path) else 0
}
except Exception as e:
self.logger.error(f"Failed to get database stats: {e}")
return {}
def get_executions_paginated(self, limit: int = 100, offset: int = 0,
script_path: Optional[str] = None, days: int = 30) -> Dict:
"""Get paginated execution history (memory-efficient for large datasets)
Args:
limit: Number of records per page (default 100)
offset: Record offset for pagination (default 0)
script_path: Filter by script path (optional)
days: Only include executions from last N days
Returns:
Dict with keys:
- data: List of execution records
- total: Total number of matching records
- limit: Records per page
- offset: Current offset
- has_more: Whether more records exist
Example:
>>> page1 = manager.get_executions_paginated(limit=50, offset=0)
>>> page2 = manager.get_executions_paginated(limit=50, offset=50)
"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
# Build query
query_where = 'WHERE 1=1'
params = []
if script_path:
query_where += ' AND script_path = ?'
params.append(script_path)
if days:
cutoff_time = (datetime.now() - timedelta(days=days)).isoformat()
query_where += ' AND start_time >= ?'
params.append(cutoff_time)
# Get total count
cursor.execute(f'SELECT COUNT(*) FROM executions {query_where}', params)
total = cursor.fetchone()[0]
# Get paginated data
cursor.execute(
f'SELECT * FROM executions {query_where} '
'ORDER BY start_time DESC LIMIT ? OFFSET ?',
params + [limit, offset]
)
data = [dict(row) for row in cursor.fetchall()]
return {
'data': data,
'total': total,
'limit': limit,
'offset': offset,
'has_more': offset + limit < total
}
except Exception as e:
self.logger.error(f"Failed to get paginated executions: {e}")
return {'data': [], 'total': 0, 'limit': limit, 'offset': offset, 'has_more': False}
def get_metrics_paginated(self, limit: int = 1000, offset: int = 0,
metric_name: Optional[str] = None, days: int = 30) -> Dict:
"""Get paginated metrics (memory-efficient for large datasets)
Args:
limit: Number of records per page (default 1000)
offset: Record offset for pagination
metric_name: Filter by metric name
days: Only include metrics from last N days
Returns:
Dict with keys:
- data: List of metric records
- total: Total number of matching records
- limit: Records per page
- offset: Current offset
- has_more: Whether more records exist
"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
query_where = 'WHERE 1=1'
params = []
if metric_name:
query_where += ' AND m.metric_name = ?'
params.append(metric_name)
if days:
cutoff_time = (datetime.now() - timedelta(days=days)).isoformat()
query_where += ' AND e.start_time >= ?'
params.append(cutoff_time)
# Get total count
cursor.execute(
f'SELECT COUNT(*) FROM metrics m '
f'JOIN executions e ON m.execution_id = e.id {query_where}',
params
)
total = cursor.fetchone()[0]
# Get paginated data
cursor.execute(
f'SELECT m.*, e.script_path, e.start_time FROM metrics m '
f'JOIN executions e ON m.execution_id = e.id {query_where} '
'ORDER BY e.start_time DESC LIMIT ? OFFSET ?',
params + [limit, offset]
)
data = [dict(row) for row in cursor.fetchall()]
return {
'data': data,
'total': total,
'limit': limit,
'offset': offset,
'has_more': offset + limit < total
}
except Exception as e:
self.logger.error(f"Failed to get paginated metrics: {e}")
return {'data': [], 'total': 0, 'limit': limit, 'offset': offset, 'has_more': False}
# ============================================================================
# FEATURE: TREND ANALYSIS & REGRESSION DETECTION
# ============================================================================
class TrendAnalyzer:
"""Analyze trends in metrics and detect performance regressions.
Industrial-grade trend analysis engine supporting statistical regression analysis,
anomaly detection, percentile calculations, and regression detection for metrics.
Implements multiple statistical methods for robust analysis across different
metric distributions.
Supports analysis methods:
- Linear regression for trend identification
- IQR/Z-score/MAD methods for anomaly detection
- Regression detection using baseline comparison
- Statistical percentile calculations
Attributes:
logger (logging.Logger): Logger instance for audit trails
Example:
>>> analyzer = TrendAnalyzer()
>>> trend = analyzer.calculate_linear_regression([1, 1.1, 1.2, 1.25, 1.3])
>>> anomalies = analyzer.detect_anomalies([100, 110, 105, 500], method='iqr')
>>> regression = analyzer.detect_regression(values, threshold_pct=10)
"""
def __init__(self, logger: Optional[logging.Logger] = None):
self.logger = logger or logging.getLogger(__name__)
def calculate_linear_regression(self, values: List[float]) -> Dict:
"""Calculate linear regression trend line for metric values.
Performs least-squares linear regression to identify performance trends
(improving, degrading, or stable). Includes R-squared goodness-of-fit metric.
Args:
values (List[float]): List of metric values in chronological order.
Minimum 2 values required for meaningful regression.
Returns:
Dict: Regression analysis results containing:
- slope (float): Rate of change per time unit
- intercept (float): Y-intercept of regression line
- r_squared (float): Goodness-of-fit (0-1, higher is better)
- trend (str): Classification ('increasing', 'decreasing', 'stable', 'insufficient_data')
- slope_pct_per_run (float): Percentage change per execution
Example:
>>> values = [100, 102, 104, 106, 108] # improving over time
>>> result = analyzer.calculate_linear_regression(values)
>>> print(f"Trend: {result['trend']}, slope: {result['slope_pct_per_run']}%")
"""
if len(values) < 2:
return {
'slope': 0,
'intercept': values[0] if values else 0,
'r_squared': 0,
'trend': 'insufficient_data',
'slope_pct_per_run': 0
}
n = len(values)
x = list(range(n)) # Time points (0, 1, 2, ...)
y = values
# Calculate means
x_mean = sum(x) / n
y_mean = sum(y) / n
# Calculate slope (m) and intercept (b)
numerator = sum((x[i] - x_mean) * (y[i] - y_mean) for i in range(n))
denominator = sum((x[i] - x_mean) ** 2 for i in range(n))
if denominator == 0:
return {
'slope': 0,
'intercept': y_mean,
'r_squared': 0,
'trend': 'flat',
'slope_pct_per_run': 0
}
slope = numerator / denominator
intercept = y_mean - slope * x_mean
# Calculate R-squared
y_pred = [intercept + slope * xi for xi in x]
ss_res = sum((y[i] - y_pred[i]) ** 2 for i in range(n))
ss_tot = sum((y[i] - y_mean) ** 2 for i in range(n))
r_squared = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
# Determine trend
if abs(slope) < 0.01:
trend = 'stable'
elif slope > 0:
trend = 'increasing'
else:
trend = 'decreasing'
return {
'slope': round(slope, 6),
'intercept': round(intercept, 2),
'r_squared': round(r_squared, 4),
'trend': trend,
'slope_pct_per_run': round((slope / y_mean * 100) if y_mean != 0 else 0, 2)
}
def detect_anomalies(self, values: List[float], method: str = 'iqr',
threshold: float = 1.5) -> Dict:
"""Detect anomalies in metric values using statistical methods.
Identifies outliers and anomalies using one of three statistical methods.
Robust against different data distributions.
Args:
values (List[float]): List of metric values to analyze
method (str): Detection method, one of:
- 'iqr': Interquartile Range method (robust, recommended for skewed data)
- 'zscore': Z-score method (good for normal distributions)
- 'mad': Median Absolute Deviation (most robust to outliers)
Default: 'iqr'
threshold (float): Sensitivity threshold:
- IQR: multiplier (1.5=standard, 3.0=extreme)
- Z-score: standard deviations (2.0=95%, 3.0=99.7%)
- MAD: modified z threshold
Default: 1.5
Returns:
Dict: Anomaly detection results:
- anomalies (List): List of detected anomalies with indices and values
- method (str): Method used
- count (int): Number of anomalies found
- percentage (float): Percentage of anomalies in dataset
Example:
>>> values = [100, 101, 99, 102, 500, 100] # 500 is anomaly
>>> result = analyzer.detect_anomalies(values, method='iqr')
>>> print(f"Found {result['count']} anomalies")
"""
if len(values) < 3:
return {'anomalies': [], 'method': method, 'count': 0}
anomalies = []
if method == 'iqr':
# Interquartile Range method
sorted_vals = sorted(values)
q1_idx = len(sorted_vals) // 4
q3_idx = (3 * len(sorted_vals)) // 4
q1 = sorted_vals[q1_idx]
q3 = sorted_vals[q3_idx]
iqr = q3 - q1
lower_bound = q1 - threshold * iqr
upper_bound = q3 + threshold * iqr
for i, val in enumerate(values):
if val < lower_bound or val > upper_bound:
anomalies.append({
'index': i,
'value': val,
'type': 'outlier',
'deviation': val - (upper_bound if val > upper_bound else lower_bound)
})
elif method == 'zscore':
# Z-score method
if len(values) > 1:
mean_val = mean(values)
std_val = stdev(values)
for i, val in enumerate(values):
z_score = (val - mean_val) / std_val if std_val > 0 else 0
if abs(z_score) > threshold:
anomalies.append({
'index': i,
'value': val,
'type': 'outlier',
'z_score': round(z_score, 2)
})
elif method == 'mad':
# Median Absolute Deviation
med = median(values)
deviations = [abs(v - med) for v in values]
mad = median(deviations)
for i, val in enumerate(values):
if mad > 0:
modified_z = 0.6745 * (val - med) / mad
if abs(modified_z) > threshold:
anomalies.append({
'index': i,
'value': val,
'type': 'outlier',
'modified_z': round(modified_z, 2)
})
return {
'anomalies': anomalies,
'method': method,
'count': len(anomalies),
'percentage': round(len(anomalies) / len(values) * 100, 2) if values else 0
}
def detect_regression(self, values: List[float], window_size: int = 5,
threshold_pct: float = 10.0) -> Dict:
"""Detect performance regressions (significant increases in metric values)
Args:
values: List of metric values (lower is better)
window_size: Number of recent values to compare against older values
threshold_pct: Percentage increase threshold to flag as regression
Returns:
Dictionary with regression analysis
"""
if len(values) < window_size * 2:
return {
'regression_detected': False,
'reason': 'insufficient_data',
'data_points': len(values)
}
# Compare recent window vs older baseline
baseline = values[:-window_size]
recent = values[-window_size:]
baseline_mean = mean(baseline)
recent_mean = mean(recent)
if baseline_mean == 0:
percent_change = 0
else:
percent_change = ((recent_mean - baseline_mean) / baseline_mean) * 100
regression_detected = percent_change > threshold_pct
return {
'regression_detected': regression_detected,
'baseline_mean': round(baseline_mean, 2),
'recent_mean': round(recent_mean, 2),
'percent_change': round(percent_change, 2),
'threshold_pct': threshold_pct,
'baseline_size': len(baseline),
'recent_size': window_size,
'severity': 'high' if percent_change > threshold_pct * 1.5 else 'medium' if regression_detected else 'none'