I’ve got a reliable little prototype python script for reading the camera streams.
So far I’ve totally failed to set device parameters outside of revopoint software, and that means I’m receiving pair 8 bit IR on the depth stream (ignore any comments in the code that indicate otherwise). The key next step is setting the stream type, the depth conversion is done on the device, while the point could projection is done by the 3DCamera library. That means without accessing the depth stream my whole project is a non starter!
#!/usr/bin/env python3
"""Rought script to capture IR and color images from a Revopoint 3D camera"""
import cv2
import numpy as np
import os
import time
import sys
import wmi
import csv
def get_revopoint_cameras():
"""Get Revopoint RGB and Depth cameras using WMI."""
revopoint_cameras = []
try:
c = wmi.WMI()
wmi_devices = c.Win32_PnPEntity()
for device in wmi_devices:
if hasattr(device, 'Caption') and device.Caption is not None:
# Look specifically for Revopoint cameras
if device.Caption.startswith("RGBCam") or device.Caption.startswith("DepthCam"):
dev_info = {
'name': device.Caption,
'hardware_id': device.PNPDeviceID if hasattr(device, 'PNPDeviceID') else None,
'is_depth': device.Caption.startswith("DepthCam")
}
revopoint_cameras.append(dev_info)
print(f"Found Revopoint camera: {device.Caption}, hardware ID: {dev_info['hardware_id']}")
return revopoint_cameras
except Exception as e:
print(f"Error detecting Revopoint cameras: {str(e)}")
return []
def extract_device_id_from_hardware_id(hardware_id):
"""Extract the device ID part from the hardware ID string."""
if not hardware_id:
return None
# The hardware ID format typically contains VID (Vendor ID) and PID (Product ID)
parts = hardware_id.split('\\')
if len(parts) >= 2:
# Extract the VID_PID part and the unique instance ID
vid_pid_part = parts[1].split('&')
if len(vid_pid_part) >= 2:
vid_pid = f"{vid_pid_part[0]}_{vid_pid_part[1]}"
# If we have an instance ID, include that for uniqueness
if len(parts) >= 3:
return f"{vid_pid}_{parts[2]}"
return vid_pid
return hardware_id # Return original if parsing fails
def check_opencv_camera(index):
"""Check if camera at given OpenCV index is available and get its properties."""
camera_info = {
'index': index,
'available': False,
'resolution': None
}
cap = cv2.VideoCapture(index)
if cap.isOpened():
camera_info['available'] = True
# Get resolution
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
camera_info['resolution'] = f"{width}x{height}"
# Release the camera
cap.release()
return camera_info
def map_revopoint_cameras(max_cameras=10):
"""Map Revopoint cameras to OpenCV indices using hardware IDs and position."""
# Get Revopoint cameras from WMI
revopoint_cameras = get_revopoint_cameras()
# Extract device IDs for WMI cameras for easier matching
for cam in revopoint_cameras:
cam['device_id_part'] = extract_device_id_from_hardware_id(cam['hardware_id'])
# Suppress OpenCV error outputs
previous_log_level = cv2.getLogLevel()
cv2.setLogLevel(0)
# Check available OpenCV cameras without verbose output
opencv_cameras = []
for i in range(max_cameras):
camera_info = check_opencv_camera(i)
if camera_info['available']:
opencv_cameras.append({
'index': i,
'resolution': camera_info['resolution']
})
print(f"Found OpenCV camera at index {i} - Resolution: {camera_info['resolution']}")
print(f"Found {len(opencv_cameras)} available OpenCV camera(s)")
# Restore previous log level
cv2.setLogLevel(previous_log_level)
# Find RGB and Depth cameras in the WMI list
rgb_camera = next((cam for cam in revopoint_cameras if not cam['is_depth']), None)
depth_camera = next((cam for cam in revopoint_cameras if cam['is_depth']), None)
# Positional mapping: Assumes index 0 is Depth, index 1 is RGB
camera_mapping = {}
if len(opencv_cameras) >= 1 and depth_camera:
camera_mapping[opencv_cameras[0]['index']] = {
'opencv_index': opencv_cameras[0]['index'],
'name': depth_camera['name'],
'hardware_id': depth_camera['hardware_id'],
'resolution': opencv_cameras[0]['resolution'],
'type': 'Depth',
'match_method': 'Position-based (index 0)'
}
if len(opencv_cameras) >= 2 and rgb_camera:
camera_mapping[opencv_cameras[1]['index']] = {
'opencv_index': opencv_cameras[1]['index'],
'name': rgb_camera['name'],
'hardware_id': rgb_camera['hardware_id'],
'resolution': opencv_cameras[1]['resolution'],
'type': 'RGB',
'match_method': 'Position-based (index 1)'
}
# Print final mapping
print("Using position-based mapping (index 0 = Depth, index 1 = RGB)")
for idx, info in camera_mapping.items():
print(f"OpenCV Index {idx} -> {info['name'] if 'name' in info else 'Unknown'} ({info['type']})")
return camera_mapping
def list_cameras(max_cameras=10):
"""List all available cameras with their properties."""
# Suppress OpenCV error outputs temporarily
previous_log_level = cv2.getLogLevel()
cv2.setLogLevel(0)
print(f"Scanning for cameras (checking indices 0-{max_cameras-1})...")
# First try to map Revopoint cameras
camera_map = map_revopoint_cameras(max_cameras=max_cameras)
# Now check for any other cameras not in the mapping
for i in range(max_cameras):
if i not in camera_map:
cap = cv2.VideoCapture(i)
if cap.isOpened():
# Get camera info
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
# Try to read a frame to confirm it's working
ret, frame = cap.read()
status = "OK" if ret else "No Frame"
print(f"Camera {i}: {width}x{height} @ {fps:.1f}fps - Status: {status}")
# Check if this could be a depth camera (typically single-channel)
is_depth = False
if ret and frame is not None:
if len(frame.shape) == 2 or (len(frame.shape) == 3 and frame.shape[2] == 1):
print(f" Likely a depth camera (single channel)")
is_depth = True
# Add to the mapping
camera_map[i] = {
'opencv_index': i,
'resolution': f"{width}x{height}",
'fps': fps,
'type': 'Depth' if is_depth else 'RGB/Color',
'status': status
}
cap.release()
# Restore log level
cv2.setLogLevel(previous_log_level)
# Get camera indices - simplified to use position-based mapping
depth_idx = 0 # Position-based: depth camera is index 0
rgb_idx = 1 # Position-based: RGB camera is index 1
return depth_idx, rgb_idx, camera_map
def print_camera_properties(cap, camera_name):
"""Print all available properties of a camera."""
title_string = f"=== {camera_name} Camera Properties ==="
print(title_string + ("=" * (40 - len(title_string))))
# Common properties to check
properties = {
cv2.CAP_PROP_FRAME_WIDTH: "FRAME_WIDTH",
cv2.CAP_PROP_FRAME_HEIGHT: "FRAME_HEIGHT",
cv2.CAP_PROP_FPS: "FPS",
cv2.CAP_PROP_FOURCC: "FOURCC",
cv2.CAP_PROP_FORMAT: "FORMAT",
cv2.CAP_PROP_CONVERT_RGB: "CONVERT_RGB",
cv2.CAP_PROP_BRIGHTNESS: "BRIGHTNESS",
cv2.CAP_PROP_CONTRAST: "CONTRAST",
cv2.CAP_PROP_GAIN: "GAIN",
cv2.CAP_PROP_EXPOSURE: "EXPOSURE",
}
for prop_id, prop_name in properties.items():
try:
value = cap.get(prop_id)
# Special handling for FOURCC to convert to readable format
if prop_id == cv2.CAP_PROP_FOURCC:
fourcc_int = int(value)
fourcc_chars = chr(fourcc_int & 0xFF) + chr((fourcc_int >> 8) & 0xFF) + chr((fourcc_int >> 16) & 0xFF) + chr((fourcc_int >> 24) & 0xFF)
print(f"{prop_name}: {value} ({fourcc_chars})")
else:
print(f"{prop_name}: {value}")
except Exception as e:
print(f"{prop_name}: Error - {str(e)}")
print("=" * 40)
def extract_depth_data(buffer, width, height):
"""
Extract 16-bit depth data from camera buffer.
Args:
buffer: Raw data buffer or frame
width: Width of the depth image
height: Height of the depth image
Returns:
16-bit depth image as numpy array
"""
try:
# For 2D array input
if isinstance(buffer, np.ndarray):
if len(buffer.shape) == 2:
# Already a 2D array, convert to 16-bit
print("Converting 2D array to 16-bit")
if buffer.shape[0] == 1:
# Reshape flat array to 2D
buffer_reshaped = buffer.reshape(height, width)
depth_array = buffer_reshaped.astype(np.uint16) * 256
else:
depth_array = buffer.astype(np.uint16) * 256
else:
# Convert other array formats
depth_array = np.array(buffer, dtype=np.uint16)
else:
# Create a placeholder for incompatible input
print("ERROR: Could not interpret buffer as depth data")
depth_array = np.zeros((height, width), dtype=np.uint16)
return depth_array
except Exception as e:
print(f"Error extracting depth data: {str(e)}")
# Return empty array on error
return np.zeros((height, width), dtype=np.uint16)
def save_depth_frame_as_csv(frame, output_dir, timestamp):
"""
Save the depth frame as left and right CSV files.
Args:
frame: The depth frame data
output_dir: Directory to save the output
timestamp: Timestamp for the filename
Returns:
List of paths to the saved CSV files
"""
try:
print(f"Frame shape: {frame.shape}")
saved_files = []
# Ensure the frame is properly shaped
if len(frame.shape) == 1 or (len(frame.shape) == 2 and frame.shape[0] == 1):
# This is a flat array that contains both left and right images
flat_frame = frame.flatten()
total_pixels = flat_frame.size
# Split the frame into two equal parts (left and right)
half_size = total_pixels // 2
# Extract left and right data
left_data = flat_frame[:half_size]
right_data = flat_frame[half_size:2*half_size]
# Reshape each half to 2D
h, w = 400, 640 # Default dimensions
# Check if we have enough data for both images
if half_size >= h * w:
left_frame = left_data[:h * w].reshape(h, w)
right_frame = right_data[:h * w].reshape(h, w)
# Save left frame
left_csv_filename = os.path.join(output_dir, f"depth_left_{timestamp}.csv")
with open(left_csv_filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
for row in left_frame:
writer.writerow(row)
print(f"Saved left depth data to: {left_csv_filename}")
saved_files.append(left_csv_filename)
# Save right frame
right_csv_filename = os.path.join(output_dir, f"depth_right_{timestamp}.csv")
with open(right_csv_filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
for row in right_frame:
writer.writerow(row)
print(f"Saved right depth data to: {right_csv_filename}")
saved_files.append(right_csv_filename)
else:
print(f"Warning: Not enough data to create two {w}x{h} images (have {total_pixels} pixels, need {2*w*h})")
# Save original frame as a fallback
csv_filename = os.path.join(output_dir, f"depth_full_{timestamp}.csv")
with open(csv_filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
# Reshape to 2D if possible
if total_pixels >= w * h:
reshaped = flat_frame[:w * h].reshape(h, w)
for row in reshaped:
writer.writerow(row)
else:
# Just write as a single row
writer.writerow(flat_frame)
print(f"Saved full depth data to: {csv_filename}")
saved_files.append(csv_filename)
else:
# For already 2D frame, save as is (fallback)
csv_filename = os.path.join(output_dir, f"depth_full_{timestamp}.csv")
with open(csv_filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
for row in frame:
writer.writerow(row)
print(f"Saved full depth data to: {csv_filename}")
saved_files.append(csv_filename)
return saved_files
except Exception as e:
print(f"Error saving depth as CSV: {str(e)}")
return None
def capture_frames(output_dir="./captured_data"):
"""
Capture RGB and depth frames from the detected cameras.
Args:
output_dir: Directory to save the captured data
"""
# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
timestamp = time.strftime("%Y%m%d_%H%M%S")
# Auto-detect cameras
depth_idx, rgb_idx, _ = list_cameras(max_cameras=10)
# Default dimensions for depth frame
width, height = 640, 400
# Suppress OpenCV error output
prev_log_level = cv2.getLogLevel()
cv2.setLogLevel(0)
# 1. Open Depth camera setting CONVERT_RGB=False
depth_cap = cv2.VideoCapture(depth_idx)
depth_cap.set(cv2.CAP_PROP_CONVERT_RGB, 0)
if not depth_cap.isOpened():
print(f"Error: Could not open Depth camera {depth_idx}")
return False
print_camera_properties(depth_cap, "Depth")
# Get actual dimensions from camera
width = int(depth_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(depth_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 2. Open RGB camera if available
rgb_cap = None
if rgb_idx is not None:
rgb_cap = cv2.VideoCapture(rgb_idx)
if not rgb_cap.isOpened():
print(f"Warning: Could not open RGB camera {rgb_idx}")
rgb_cap = None
else:
print_camera_properties(rgb_cap, "RGB")
# 3. Capture Depth frame
ret, depth_frame = depth_cap.read()
print(f"Depth frame capture result: {ret}")
if not ret or depth_frame is None:
print("Error: Failed to capture Depth frame")
depth_cap.release()
if rgb_cap:
rgb_cap.release()
return False
# Determine frame dimensions
if len(depth_frame.shape) == 3:
frame_height, frame_width = depth_frame.shape[:2]
elif len(depth_frame.shape) == 2:
frame_height, frame_width = depth_frame.shape
else:
frame_height, frame_width = height, width
# Extract 16-bit depth data
depth_16bit = extract_depth_data(depth_frame, frame_width, frame_height)
# 4. Capture RGB frame if available
if rgb_cap:
ret, rgb_frame = rgb_cap.read()
print(f"RGB frame capture result: {ret}")
if not ret or rgb_frame is None:
print("Warning: Failed to capture RGB frame")
else:
# Save RGB frame
rgb_filename = os.path.join(output_dir, f"rgb_{timestamp}.png")
cv2.imwrite(rgb_filename, rgb_frame)
print(f"RGB frame saved to: {rgb_filename}")
# 5. Clean up
depth_cap.release()
if rgb_cap:
rgb_cap.release()
# Restore log level
cv2.setLogLevel(prev_log_level)
# Save depth data as CSV
csv_file = save_depth_frame_as_csv(depth_16bit, output_dir, timestamp)
return csv_file is not None
if __name__ == "__main__":
if capture_frames(output_dir='./captured_data'):
print("Capture and processing completed successfully.")
else:
print("Capture or processing had errors.")
sys.exit(1)