this is the working code: # multi_hog_video_process_8s.py import cv2 import numpy as np import time from pynq import Overlay, MMIO, allocate # ---------------- Config ---------------- BITFILE = "design_4_clk.bit" INPUT_VIDEO = "new_vid_ped.mov" OUTPUT_VIDEO = "ped4_output.mov" PROCESS_DURATION = 10 # seconds # HOG IP base addresses CTRL_BASES = [ 0xA0000000, 0xA0020000, 0xA0040000, 0xA0060000, 0xA0080000, 0xA00A0000, 0xA00C0000, 0xA00E0000 ] SPECS_BASES = [ 0xA0010000, 0xA0030000, 0xA0050000, 0xA0070000, 0xA0090000, 0xA00B0000, 0xA00D0000, 0xA00F0000 ] MMIO_SIZE = 0x10000 # Window and detection parameters FRAME_W, FRAME_H = 320, 240 WIN_W, WIN_H = 64, 128 STRIDE_X, STRIDE_Y = 16, 16 THRESHOLD = 150000 NMS_IOU = 0.01 # ---------------- Helpers ---------------- def wait_idle(mmio): while True: val = mmio.read(0x00) if val & 0x4: break time.sleep(0.0005) def run_batch(x_list, y_list, frame_phys, stride, ctrl_mmios, specs_mmios): scores = [0]*len(x_list) for i, (x, y) in enumerate(zip(x_list, y_list)): byte_offset = y*stride + x window_phys = frame_phys + byte_offset specs_mmios[i].write(0x00, byte_offset) # SPEC0: offset specs_mmios[i].write(0x04, stride) # SPEC1: stride specs_mmios[i].write(0x08, i) # SPEC2: accelerator ID ctrl_mmios[i].write(0x10, window_phys & 0xFFFFFFFF) ctrl_mmios[i].write(0x14, (window_phys >> 32) & 0xFFFFFFFF) ctrl_mmios[i].write(0x00, 0x1) for i in range(len(x_list)): wait_idle(ctrl_mmios[i]) scores[i] = specs_mmios[i].read(0x1C) return scores def nms(boxes, iou_thresh=0.3): if len(boxes) == 0: return [] boxes = np.array(boxes) x1, y1, x2, y2, scores = boxes[:,0], boxes[:,1], boxes[:,2], boxes[:,3], boxes[:,4] idxs = scores.argsort()[::-1] keep = [] while len(idxs) > 0: i = idxs[0] keep.append(i) xx1 = np.maximum(x1[i], x1[idxs[1:]]) yy1 = np.maximum(y1[i], y1[idxs[1:]]) xx2 = np.minimum(x2[i], x2[idxs[1:]]) yy2 = np.minimum(y2[i], y2[idxs[1:]]) w = np.maximum(0, xx2 - xx1) h = np.maximum(0, yy2 - yy1) inter = w*h union = ((x2[i]-x1[i])*(y2[i]-y1[i]) + (x2[idxs[1:]]-x1[idxs[1:]])*(y2[idxs[1:]]) - inter) iou = inter / (union + 1e-6) idxs = idxs[1:][iou <= iou_thresh] return boxes[keep].tolist() # ---------------- Main ---------------- print("Loading overlay...") ol = Overlay(BITFILE) print("Overlay loaded") ctrl_mmios = [MMIO(base, MMIO_SIZE) for base in CTRL_BASES] specs_mmios = [MMIO(base, MMIO_SIZE) for base in SPECS_BASES] cap = cv2.VideoCapture(INPUT_VIDEO) if not cap.isOpened(): raise RuntimeError("Could not open input video") fps = int(cap.get(cv2.CAP_PROP_FPS)) if fps == 0 or fps > 120: # fallback fps = 30 print(f"Using FPS = {fps}") max_frames = fps * PROCESS_DURATION print(f"Will process first {max_frames} frames (~{PROCESS_DURATION}s)") out_writer = cv2.VideoWriter( OUTPUT_VIDEO, cv2.VideoWriter_fourcc(*"XVID"), fps, (FRAME_W, FRAME_H) ) # CMA buffer frame_buf = allocate(shape=(FRAME_H*FRAME_W,), dtype=np.uint8) frame_phys = frame_buf.physical_address print(f"CMA buffer at 0x{frame_phys:08X}, size={FRAME_H*FRAME_W}") frame_idx = 0 while frame_idx < max_frames: ret, frame = cap.read() if not ret: break frame_idx += 1 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray_resized = cv2.resize(gray, (FRAME_W, FRAME_H)) frame_buf[:] = gray_resized.flatten() frame_buf.flush() detections = [] for y in range(0, FRAME_H - WIN_H + 1, STRIDE_Y): for x in range(0, FRAME_W - WIN_W + 1, STRIDE_X*8): x_batch, y_batch = [], [] for i in range(8): x_win = x + i*STRIDE_X if x_win + WIN_W <= FRAME_W: x_batch.append(x_win) y_batch.append(y) if not x_batch: continue scores = run_batch(x_batch, y_batch, frame_phys, FRAME_W, ctrl_mmios, specs_mmios) for xb, yb, sc in zip(x_batch, y_batch, scores): if sc > THRESHOLD: detections.append([xb, yb, xb+WIN_W, yb+WIN_H, sc]) filtered = nms(detections, iou_thresh=NMS_IOU) out = cv2.cvtColor(gray_resized, cv2.COLOR_GRAY2BGR) for (x1,y1,x2,y2,score) in filtered: cv2.rectangle(out, (int(x1), int(y1)), (int(x2), int(y2)), (0,0,255), 2) cv2.putText(out, str(int(score)), (int(x1), int(y1)-5), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (0,255,0), 1) out_writer.write(out) print(f"Processed frame {frame_idx}/{max_frames}, detections={len(filtered)}") cap.release() out_writer.release() print(f"Saved output video → {OUTPUT_VIDEO}")