detector py
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
from config import CONFIDENCE_THRESHOLD
class TrafficDetector:
def __init__(self, model_path):
self.model = YOLO(model_path)
self.class_names = self.model.names
self.vehicle_colors = {
2: (0, 165, 255),
3: (0, 255, 0),
7: (0, 0, 255)
}
self.person_color = (255, 0, 0)
self.fps_color = (0, 0, 255)
self.vehicle_count_color = (0, 165, 255)
self.person_count_color = (255, 0, 0)
self.track_history = defaultdict(lambda: [])
self.vehicle_count = set()
self.person_count = set()
self.current_objects = set()
def detect(self, frame):
results = self.model.track(frame, persist=True, verbose=False)
detections = []
self.current_objects.clear()
for r in results:
boxes = r.boxes.xyxy.cpu().numpy()
track_ids = r.boxes.id.cpu().numpy() if r.boxes.id is not None else None
for i, box in enumerate(boxes):
conf = r.boxes.conf[i].cpu().numpy()
cls = int(r.boxes.cls[i].cpu().numpy())
if conf < CONFIDENCE_THRESHOLD:
continue
x1, y1, x2, y2 = map(int, box)
track_id = int(track_ids[i]) if track_ids is not None else None
if cls in self.vehicle_colors or cls == 0:
label = f"{self.class_names[cls]}:{track_id}" if track_id else f"{self.class_names[cls]}"
if track_id:
self.current_objects.add(track_id)
if cls == 0: # Person
if track_id not in self.person_count:
self.person_count.add(track_id)
print(
f"New Person detected | ID: {track_id} | Total Detected: {len(self.person_count)}")
else: # Vehicle
if track_id not in self.vehicle_count:
self.vehicle_count.add(track_id)
print(
f"New Vehicle detected | Type: {self.class_names[cls]} | ID: {track_id} | Total Detected: {len(self.vehicle_count)}")
detections.append((x1, y1, x2, y2, label, cls))
return detections
def draw_detections(self, frame, detections, fps):
for x1, y1, x2, y2, label, cls in detections:
color = self.vehicle_colors.get(cls, self.person_color)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# Determine text size based on ID length
font_scale = 0.5 if len(label.split(':')[-1]) < 3 else 0.4
thickness = 1
(text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)
# Adjust box position if text is small
text_y = y1 - 5 if font_scale == 0.5 else y1 - 3
box_top = y1 - text_height - 10 if font_scale == 0.5 else y1 - text_height - 8
cv2.rectangle(frame, (x1, box_top), (x1 + text_width, y1), color, -1)
cv2.putText(frame, label, (x1, text_y),
cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 0), thickness)
# Status panel (unchanged)
status_lines = [
(f"FPS: {fps:.1f}", self.fps_color),
(f"Vehicles: {len(self.vehicle_count)}", self.vehicle_count_color),
(f"People: {len(self.person_count)}", self.person_count_color)
]
for i, (line, color) in enumerate(status_lines):
(text_width, text_height), _ = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(frame, (10, 10 + i * 25), (15 + text_width, 30 + i * 25), color, -1)
cv2.putText(frame, line, (15, 25 + i * 25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return frame
main py
import cv2
import time
from config import VIDEO_PATH, MODEL
from detector import TrafficDetector
def process_video():
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
print("Error opening video file")
return
detector = TrafficDetector(MODEL)
prev_time = time.time()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Calculate FPS
curr_time = time.time()
fps = 1 / (curr_time - prev_time)
prev_time = curr_time
detections = detector.detect(frame)
processed_frame = detector.draw_detections(frame, detections, fps)
cv2.imshow('Traffic Monitoring', processed_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
process_video()