import supervision as sv
from inference import InferencePipeline
from inference.core.interfaces.camera.entities import VideoFrame
from utils.general import find_in_list, load_zones_config
from utils.timers import ClockBasedTimer
import cv2
import numpy as np
COLORS = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"])
COLOR_ANNOTATOR = sv.ColorAnnotator(color=COLORS)
LABEL_ANNOTATOR = sv.LabelAnnotator(
color=COLORS, text_color=sv.Color.from_hex("#000000")
# why is the stream zoomed in
class CustomSink:
def __init__(self, weights_path: str, zone_configuration_path: str, classes: List[int]):
self._model = YOLO(weights_path)
self.classes = classes
self.tracker = sv.ByteTrack(minimum_matching_threshold=0.5)
self.fps_monitor = sv.FPSMonitor()
self.polygons = load_zones_config(file_path=zone_configuration_path)
self.timers = [ClockBasedTimer() for _ in self.polygons]
self.zones = [
sv.PolygonZone(
polygon=polygon,
triggering_anchors=(sv.Position.CENTER,),
for polygon in self.polygons
def infer(self, video_frames: List[VideoFrame]) -> List[any]:
# result must be returned as list of elements representing model prediction for single frame
# with order unchanged.
return self._model([v.image for v in video_frames], imgsz="700")
def on_prediction(self, result: dict, frame: VideoFrame) -> None:
self.fps_monitor.tick()
fps = self.fps_monitor.fps
detections = sv.Detections.from_ultralytics(result)
detections = detections[find_in_list(detections.class_id, self.classes)]
detections = self.tracker.update_with_detections(detections)
annotated_frame = frame.image.copy()
annotated_frame = sv.draw_text(
scene=annotated_frame,
text=f"{fps:.1f}",
text_anchor=sv.Point(40, 30),
background_color=sv.Color.from_hex("#A351FB"),
text_color=sv.Color.from_hex("#000000"),
for idx, zone in enumerate(self.zones):
annotated_frame = sv.draw_polygon(
scene=annotated_frame, polygon=zone.polygon, color=COLORS.by_idx(idx)
detections_in_zone = detections[zone.trigger(detections)]
time_in_zone = self.timers[idx].tick(detections_in_zone)
custom_color_lookup = np.full(detections_in_zone.class_id.shape, idx)
annotated_frame = COLOR_ANNOTATOR.annotate(
scene=annotated_frame,
detections=detections_in_zone,
custom_color_lookup=custom_color_lookup,
labels = [
f"#{tracker_id} {int(time // 60):02d}:{int(time % 60):02d}"
for tracker_id, time in zip(detections_in_zone.tracker_id, time_in_zone)
annotated_frame = LABEL_ANNOTATOR.annotate(
scene=annotated_frame,
detections=detections_in_zone,
labels=labels,
custom_color_lookup=custom_color_lookup,
cv2.imshow("Processed Video", annotated_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
cv2.destroyAllWindows()
raise SystemExit("Program terminated by user")
def main(
weight_path: str,
rtsp_url: str,
zone_configuration_path: str,
model_id: str,
confidence: float,
iou: float,
classes: List[int],
) -> None:
sink = CustomSink(weights_path=weight_path ,zone_configuration_path=zone_configuration_path, classes=classes)
pipeline = InferencePipeline.init_with_custom_logic(
video_reference=rtsp_url,
on_video_frame=sink.infer,
on_prediction=sink.on_prediction,
pipeline.start()
pipeline.join()
except (KeyboardInterrupt, SystemExit):
pipeline.terminate()
print("Program terminated.")
if __name__ == "__main__":
// calling main and passing args to main
main()
I am am passing rtsp url on which the inference is done. There is one issue the annotated feed that is returned is a bit zoomed in. why is that? I tried changing the
imgz
arg from 640 to 800 and 700 in the
infer
but that didn’t fix the zoom issue. I want it zoomed out (like the original)
if I manually resize the frame using the opencv then how do I pass it to the inference pipeline since the pipeline take the rtsp url as arg and under the hood captures the frame and process it
pipeline = InferencePipeline.init_with_custom_logic(
video_reference=rtsp_url,
on_video_frame=sink.infer,
on_prediction=sink.on_prediction,