-
Notifications
You must be signed in to change notification settings - Fork 223
Open
Description
The radar antenna is arranged as shown below:
Row 1 1 4 5 8
Row 2 2 3 6 7
Row 3 9 12
Row 4 10 11
There is a 180 ° phase difference between receiving antennas 1, 4, and 2, 3; 5,8 and 6,7; 11,12 and 9, 10.
I performed range-fft, azimuth beamforming and Doppler-fft on the raw data collected by DCA1000 according to the demo. I will next perform elevation estimation and 3d point cloud generation?
My code is following:
import mmwave as mm
import mmwave.dsp as dsp
from mmwave.dataloader import DCA1000
# from mmwave.tracking import EKF
# from mmwave.tracking import gtrack_visualize
from mmwave.dsp.utils import Window
import numpy as np
import os
# import cv2
import seaborn as sns
import matplotlib
matplotlib.use('TkAgg')
import matplotlib as mpl
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
NUM_RX = 4
NUM_TX = 3
VIRT_ANT = 4
NUM_CHIRPS = 128
NUM_ADC_SAMPLES = 256
RANGE_RESOLUTION = (3e8 * 6000 * 1e3) / (2 * 59 * 1e12 * 256)
MAX_RANGE = (300 * 0.9 * 6000) / (2 * 59 * 1e3)
print(RANGE_RESOLUTION, MAX_RANGE)
DOPPLER_RESOLUTION = 0.0405
NUM_FRAMES = 256
SKIP_SIZE = 4
ANGLE_RES = 1
ANGLE_RANGE = 90
ANGLE_BINS = (ANGLE_RANGE * 2) // ANGLE_RES + 1
BINS_PROCESSED = 256
load_data = True
if load_data:
adc_data = np.fromfile('D:/Project/Python_Project/mmWaveData/data/test_data_walk2.bin', dtype=np.uint16)
adc_data = adc_data.reshape(NUM_FRAMES, -1)
all_data = np.apply_along_axis(DCA1000.organize, 1, adc_data, num_chirps=NUM_CHIRPS*NUM_TX, num_rx=NUM_RX, num_samples=NUM_ADC_SAMPLES)
# print(all_data.shape)
range_azimuth = np.zeros((ANGLE_BINS, BINS_PROCESSED))
# range_elevation = np.zeros((ANGLE_BINS, BINS_PROCESSED))
num_vec, steering_vec = dsp.gen_steering_vec(ANGLE_RANGE, ANGLE_RES, VIRT_ANT)
# tracker = EKF()
fig = plt.figure(figsize=(10, 7)) # 创建一个画布figure,然后在这个画布上加各种元素。
ax = Axes3D(fig) # 将画布作用于 Axes3D 对象上。
# fig, ax = plt.subplots()
plt.ion()
# fig = plt.figure(figsize=(25, 15))
# fig.clf()
for frame in all_data:
""" 1 (Range Processing) """
# --- range fft
radar_cube = dsp.range_processing(frame)
# print('range-fft:', radar_cube.shape)
""" 2 (Capon Beamformer) """
# --- static clutter removal
mean = radar_cube.mean(0)
radar_cube = radar_cube - mean
# print(radar_cube.shape)
# --- capon beamforming
beamWeights_RA = np.zeros((VIRT_ANT, BINS_PROCESSED), dtype=np.complex_)
# beamWeights_RE = np.zeros((VIRT_ANT, BINS_PROCESSED), dtype=np.complex_)
# radar_cube = np.concatenate((radar_cube[0::2, ...], radar_cube[1::2, ...]), axis=1)
radar_virt_1 = radar_cube[0::3, 0, ...].reshape(NUM_CHIRPS, 1, -1) #TX1 - RX1
radar_virt_2 = -1 * radar_cube[0::3, 1, ...].reshape(NUM_CHIRPS, 1, -1) #TX1 - RX2 (180° phase inversion)
radar_virt_3 = -1 * radar_cube[0::3, 2, ...].reshape(NUM_CHIRPS, 1, -1) #TX1 - RX3
radar_virt_4 = radar_cube[0::3, 3, ...].reshape(NUM_CHIRPS, 1, -1) #TX1 - RX4
radar_virt_5 = radar_cube[2::3, 0, ...].reshape(NUM_CHIRPS, 1, -1) #TX2 - RX1
radar_virt_6 = -1 * radar_cube[2::3, 1, ...].reshape(NUM_CHIRPS, 1, -1) #TX2 - RX2
radar_virt_7 = -1 * radar_cube[2::3, 2, ...].reshape(NUM_CHIRPS, 1, -1) #TX2 - RX3
radar_virt_8 = radar_cube[2::3, 3, ...].reshape(NUM_CHIRPS, 1, -1) #TX2 - RX4
radar_virt_9 = radar_cube[1::3, 0, ...].reshape(NUM_CHIRPS, 1, -1) #TX3 - RX1
radar_virt_10 = -1 * radar_cube[1::3, 1, ...].reshape(NUM_CHIRPS, 1, -1) #TX3 - RX2
radar_virt_11 = -1 * radar_cube[1::3, 2, ...].reshape(NUM_CHIRPS, 1, -1) #TX3 - RX3
radar_virt_12 = radar_cube[1::3, 3, ...].reshape(NUM_CHIRPS, 1, -1) #TX3 - RX4
radar_cube_RA = np.concatenate((radar_virt_1, radar_virt_4, radar_virt_5, radar_virt_8), axis=1)
radar_cube_All = np.concatenate((radar_virt_1, radar_virt_2, radar_virt_3, radar_virt_4,
radar_virt_5, radar_virt_6, radar_virt_7, radar_virt_8,
radar_virt_9, radar_virt_10, radar_virt_11, radar_virt_12), axis=1)
radar_cube_RE = np.concatenate((radar_virt_10, radar_virt_9, radar_virt_6, radar_virt_5), axis=1)
# print('radar_cube_RA: ', radar_cube_RA.shape)
# Note that when replacing with generic doppler estimation functions, radarCube is interleaved and
# has doppler at the last dimension.
for i in range(BINS_PROCESSED):
range_azimuth[:,i], beamWeights_RA[:,i] = dsp.aoa_capon(radar_cube_RA[:, :, i].T, steering_vec, magnitude=True)
""" 3 (Object Detection) """
heatmap_RA_log = np.log2(range_azimuth)
# heatmap_RE_log = np.log2(range_elevation)
# heatmap_log = (range_azimuth)
# --- cfar in azimuth direction
first_pass, _ = np.apply_along_axis(func1d=dsp.ca_,
axis=0,
arr=heatmap_RA_log,
# l_bound=1.5,
l_bound=1.5,
guard_len=4,
noise_len=16)
# --- cfar in range direction
second_pass, noise_floor = np.apply_along_axis(func1d=dsp.ca_,
axis=0,
arr=heatmap_RA_log.T,
# l_bound=2.5,
l_bound=2.5,
guard_len=4,
noise_len=16)
# --- classify peaks and caclulate snrs
noise_floor = noise_floor.T
first_pass = (heatmap_RA_log > first_pass)
second_pass = (heatmap_RA_log > second_pass.T)
peaks = (first_pass & second_pass)
peaks[:SKIP_SIZE, :] = 0
peaks[-SKIP_SIZE:, :] = 0
peaks[:, :SKIP_SIZE] = 0
peaks[:, -SKIP_SIZE:] = 0
pairs = np.argwhere(peaks)
azimuths, ranges = pairs.T
snrs = heatmap_RA_log[pairs[:,0], pairs[:,1]] - noise_floor[pairs[:,0], pairs[:,1]]
# print('azumith:', azimuths.shape, ranges.shape)
range_azimuth_cfar = np.zeros(range_azimuth.shape)
# range_elevation_cfar = np.zeros(range_elevation.shape)
# print(range_azimuth_cfar.shape)
range_azimuth_cfar[azimuths, ranges] = range_azimuth[azimuths, ranges]
# range_elevation_cfar[elevations, ranges_el] = range_elevation[elevations, ranges_el]
# ax3 = fig.add_subplot(1, 1, 1)
# sns.heatmap(range_azimuth_cfar.T[10:80, 30:150])
# ax3.set_title('range-azimuth heatmap cfar')
# plt.pause(0.01)
# plt.show(block=False)
# plt.clf()
""" 4 (Elevation Estimation) """
""" 5 (Doppler Estimation) """
# --- get peak indices
# beamWeights should be selected based on the range indices from CFAR.
dopplerFFTInput = radar_cube_RA[:, :, ranges]
beamWeights_RA = beamWeights_RA[:, ranges]
# dopplerFFTInput = radar_cube_RA
# beamWeights_RA = beamWeights_RA
# --- estimate doppler values
# For each detected object and for each chirp combine the signals from 4 Rx, i.e.
# For each detected object, matmul (numChirpsPerFrame, numRxAnt) with (numRxAnt) to (numChirpsPerFrame)
dopplerFFTInput = np.einsum('ijk,jk->ik', dopplerFFTInput, beamWeights_RA)
if not dopplerFFTInput.shape[-1]:
continue
dopplerEst = np.fft.fft(dopplerFFTInput, axis=0)
# print(dopplerEst.shape)
dopplerEst = np.argmax(dopplerEst, axis=0)
dopplerEst[dopplerEst[:]>=NUM_CHIRPS/2] -= NUM_CHIRPS
# print(dopplerEst)
"""6 (point cloud generation)"""
Metadata
Metadata
Assignees
Labels
No labels