detector py

import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
from config import CONFIDENCE_THRESHOLD

class TrafficDetector:
    def __init__(self, model_path):
        self.model = YOLO(model_path)
        self.class_names = self.model.names
        self.vehicle_colors = {
            2: (0, 165, 255),
            3: (0, 255, 0),
            7: (0, 0, 255)
        }
        self.person_color = (255, 0, 0)
        self.fps_color = (0, 0, 255)
        self.vehicle_count_color = (0, 165, 255)
        self.person_count_color = (255, 0, 0)
        self.track_history = defaultdict(lambda: [])
        self.vehicle_count = set()
        self.person_count = set()
        self.current_objects = set()

    def detect(self, frame):
        results = self.model.track(frame, persist=True, verbose=False)
        detections = []
        self.current_objects.clear()

        for r in results:
            boxes = r.boxes.xyxy.cpu().numpy()
            track_ids = r.boxes.id.cpu().numpy() if r.boxes.id is not None else None

            for i, box in enumerate(boxes):
                conf = r.boxes.conf[i].cpu().numpy()
                cls = int(r.boxes.cls[i].cpu().numpy())

                if conf < CONFIDENCE_THRESHOLD:
                    continue

                x1, y1, x2, y2 = map(int, box)
                track_id = int(track_ids[i]) if track_ids is not None else None

                if cls in self.vehicle_colors or cls == 0:
                    label = f"{self.class_names[cls]}:{track_id}" if track_id else f"{self.class_names[cls]}"

                    if track_id:
                        self.current_objects.add(track_id)
                        if cls == 0:  # Person
                            if track_id not in self.person_count:
                                self.person_count.add(track_id)
                                print(
                                    f"New Person detected | ID: {track_id} | Total Detected: {len(self.person_count)}")
                        else:  # Vehicle
                            if track_id not in self.vehicle_count:
                                self.vehicle_count.add(track_id)
                                print(
                                    f"New Vehicle detected | Type: {self.class_names[cls]} | ID: {track_id} | Total Detected: {len(self.vehicle_count)}")

                    detections.append((x1, y1, x2, y2, label, cls))

        return detections

    def draw_detections(self, frame, detections, fps):
        for x1, y1, x2, y2, label, cls in detections:
            color = self.vehicle_colors.get(cls, self.person_color)
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

            # Determine text size based on ID length
            font_scale = 0.5 if len(label.split(':')[-1]) < 3 else 0.4
            thickness = 1

            (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, font_scale, thickness)

            # Adjust box position if text is small
            text_y = y1 - 5 if font_scale == 0.5 else y1 - 3
            box_top = y1 - text_height - 10 if font_scale == 0.5 else y1 - text_height - 8

            cv2.rectangle(frame, (x1, box_top), (x1 + text_width, y1), color, -1)
            cv2.putText(frame, label, (x1, text_y),
                        cv2.FONT_HERSHEY_SIMPLEX, font_scale, (0, 0, 0), thickness)

        # Status panel (unchanged)
        status_lines = [
            (f"FPS: {fps:.1f}", self.fps_color),
            (f"Vehicles: {len(self.vehicle_count)}", self.vehicle_count_color),
            (f"People: {len(self.person_count)}", self.person_count_color)
        ]

        for i, (line, color) in enumerate(status_lines):
            (text_width, text_height), _ = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
            cv2.rectangle(frame, (10, 10 + i * 25), (15 + text_width, 30 + i * 25), color, -1)
            cv2.putText(frame, line, (15, 25 + i * 25), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

        return frame

main py

import cv2
import time
from config import VIDEO_PATH, MODEL
from detector import TrafficDetector

def process_video():
    cap = cv2.VideoCapture(VIDEO_PATH)
    if not cap.isOpened():
        print("Error opening video file")
        return

    detector = TrafficDetector(MODEL)
    prev_time = time.time()

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Calculate FPS
        curr_time = time.time()
        fps = 1 / (curr_time - prev_time)
        prev_time = curr_time

        detections = detector.detect(frame)
        processed_frame = detector.draw_detections(frame, detections, fps)

        cv2.imshow('Traffic Monitoring', processed_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    process_video()