  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
import supervision as sv from inference import InferencePipeline from inference.core.interfaces.camera.entities import VideoFrame from utils.general import find_in_list, load_zones_config from utils.timers import ClockBasedTimer import cv2 import numpy as np COLORS = sv.ColorPalette.from_hex(["#E6194B", "#3CB44B", "#FFE119", "#3C76D1"]) COLOR_ANNOTATOR = sv.ColorAnnotator(color=COLORS) LABEL_ANNOTATOR = sv.LabelAnnotator( color=COLORS, text_color=sv.Color.from_hex("#000000") # why is the stream zoomed in class CustomSink: def __init__(self, weights_path: str, zone_configuration_path: str, classes: List[int]): self._model = YOLO(weights_path) self.classes = classes self.tracker = sv.ByteTrack(minimum_matching_threshold=0.5) self.fps_monitor = sv.FPSMonitor() self.polygons = load_zones_config(file_path=zone_configuration_path) self.timers = [ClockBasedTimer() for _ in self.polygons] self.zones = [ sv.PolygonZone( polygon=polygon, triggering_anchors=(sv.Position.CENTER,), for polygon in self.polygons def infer(self, video_frames: List[VideoFrame]) -> List[any]: # result must be returned as list of elements representing model prediction for single frame # with order unchanged. return self._model([v.image for v in video_frames], imgsz="700") def on_prediction(self, result: dict, frame: VideoFrame) -> None: self.fps_monitor.tick() fps = self.fps_monitor.fps detections = sv.Detections.from_ultralytics(result) detections = detections[find_in_list(detections.class_id, self.classes)] detections = self.tracker.update_with_detections(detections) annotated_frame = frame.image.copy() annotated_frame = sv.draw_text( scene=annotated_frame, text=f"{fps:.1f}", text_anchor=sv.Point(40, 30), background_color=sv.Color.from_hex("#A351FB"), text_color=sv.Color.from_hex("#000000"), for idx, zone in enumerate(self.zones): annotated_frame = sv.draw_polygon( scene=annotated_frame, polygon=zone.polygon, color=COLORS.by_idx(idx) detections_in_zone = detections[zone.trigger(detections)] time_in_zone = self.timers[idx].tick(detections_in_zone) custom_color_lookup = np.full(detections_in_zone.class_id.shape, idx) annotated_frame = COLOR_ANNOTATOR.annotate( scene=annotated_frame, detections=detections_in_zone, custom_color_lookup=custom_color_lookup, labels = [ f"#{tracker_id} {int(time // 60):02d}:{int(time % 60):02d}" for tracker_id, time in zip(detections_in_zone.tracker_id, time_in_zone) annotated_frame = LABEL_ANNOTATOR.annotate( scene=annotated_frame, detections=detections_in_zone, labels=labels, custom_color_lookup=custom_color_lookup, cv2.imshow("Processed Video", annotated_frame) if cv2.waitKey(1) & 0xFF == ord('q'): cv2.destroyAllWindows() raise SystemExit("Program terminated by user") def main( weight_path: str, rtsp_url: str, zone_configuration_path: str, model_id: str, confidence: float, iou: float, classes: List[int], ) -> None: sink = CustomSink(weights_path=weight_path ,zone_configuration_path=zone_configuration_path, classes=classes) pipeline = InferencePipeline.init_with_custom_logic( video_reference=rtsp_url, on_video_frame=sink.infer, on_prediction=sink.on_prediction, pipeline.start() pipeline.join() except (KeyboardInterrupt, SystemExit): pipeline.terminate() print("Program terminated.") if __name__ == "__main__": // calling main and passing args to main main()

I am am passing rtsp url on which the inference is done. There is one issue the annotated feed that is returned is a bit zoomed in. why is that? I tried changing the imgz arg from 640 to 800 and 700 in the infer but that didn’t fix the zoom issue. I want it zoomed out (like the original)

if I manually resize the frame using the opencv then how do I pass it to the inference pipeline since the pipeline take the rtsp url as arg and under the hood captures the frame and process it

    pipeline = InferencePipeline.init_with_custom_logic(