#!/usr/bin/env python# coding=utf-8import numpy as npimport matplotlib.pyplot as pltimport os# -----------------------------------------------------------------------------# 系统参数(论文典型配置)# -----------------------------------------------------------------------------N_FFT = 64 # IFFT/FFT 点数N_CP = 16 # 循环前缀长度N_TX = 2 # 发射天线N_RX = 2 # 接收天线MOD_ORDER = 6 # 64QAM = 6 bit/symbolSNR = 30 # 信噪比FIR_TAPS = np.array([1, 0.5, 0.2, 0.1]) # FIR低通滤波器系数np.random.seed(0) # 固定随机种子,确保每次输出可复现def dump_signal(name, x, n=8): arr = np.asarray(x) flat = arr.reshape(-1) print(f"{name}: shape={arr.shape}, dtype={arr.dtype}") print(f"{name} 前{min(n, len(flat))}点:", np.round(flat[:n], 3))def plot_complex_stage(signals, title_prefix, save_path): n = np.arange(len(signals[0])) fig, axes = plt.subplots(2, 1, figsize=(10, 6), sharex=True) axes[0].plot(n, signals[0].real, label="Ant1 Real", linewidth=1.2) axes[0].plot(n, signals[0].imag, label="Ant1 Imag", linewidth=1.2, linestyle="--") axes[0].set_title(f"{title_prefix} (Antenna 1)") axes[0].set_ylabel("Amplitude") axes[0].grid(True, alpha=0.3) axes[0].legend() axes[1].plot(n, signals[1].real, label="Ant2 Real", linewidth=1.2) axes[1].plot(n, signals[1].imag, label="Ant2 Imag", linewidth=1.2, linestyle="--") axes[1].set_title(f"{title_prefix} (Antenna 2)") axes[1].set_xlabel("Sample Index") axes[1].set_ylabel("Amplitude") axes[1].grid(True, alpha=0.3) axes[1].legend() plt.tight_layout() plt.savefig(save_path, dpi=160) plt.close(fig)def ensure_fig_dir(): fig_dir = "figures" os.makedirs(fig_dir, exist_ok=True) return fig_dir# -----------------------------------------------------------------------------# LDPC(简单系统码示例):(N, K) = (24, 12), H = [A I]# -----------------------------------------------------------------------------K_LDPC = 12N_LDPC = 24A_LDPC = np.array([ [1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0], [0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0], [0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 0], [0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0], [0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0], [0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1], [1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0], [0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0], [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1], [1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1],], dtype=int)H_LDPC = np.hstack([A_LDPC, np.eye(N_LDPC - K_LDPC, dtype=int)])def ldpc_encode(u_bits): u = np.asarray(u_bits, dtype=int) % 2 p = (A_LDPC @ u) % 2 return np.concatenate([u, p]).astype(int)def ldpc_decode_bitflip(rx_bits, max_iter=20): v = np.asarray(rx_bits, dtype=int).copy() % 2 for _ in range(max_iter): syndrome = (H_LDPC @ v) % 2 if not syndrome.any(): break unsat_checks = np.where(syndrome == 1)[0] if len(unsat_checks) == 0: break scores = np.zeros(N_LDPC, dtype=int) for m in unsat_checks: involved = np.where(H_LDPC[m] == 1)[0] scores[involved] += 1 threshold = max(2, int(np.max(scores) * 0.6)) flip_idx = np.where(scores >= threshold)[0] if len(flip_idx) == 0: flip_idx = np.array([int(np.argmax(scores))]) v[flip_idx] ^= 1 return v[:K_LDPC], v# -----------------------------------------------------------------------------# ===================== 发送端 TRANSMITTER =====================# -----------------------------------------------------------------------------print("="*80)print(" 发送端信号处理流程")print("="*80)# 1. 原始比特 + 信道编码(LDPC)print("\n[发送1] 原始比特 & 信道编码")bits = np.random.randint(0, 2, K_LDPC) # 原始比特print("原始比特:\t", bits)coded_bits = ldpc_encode(bits)print("编码后比特:\t", coded_bits)print(f"LDPC参数:K={K_LDPC}, N={N_LDPC}, 码率={K_LDPC/N_LDPC:.2f}")# 2. 64QAM 调制print("\n[发送2] 64QAM 调制映射")def qam64_mod(bits): bits = bits.reshape(-1, 6) syms = np.zeros(len(bits), dtype=complex) for i, b in enumerate(bits): i_val = 2 * (b[0]*4 + b[1]*2 + b[2]) - 7 q_val = 2 * (b[3]*4 + b[4]*2 + b[5]) - 7 syms[i] = i_val + 1j * q_val return syms / np.sqrt(42)qam_syms = qam64_mod(coded_bits)print("调制后复数符号:\n", np.round(qam_syms, 3))# 3. MIMO 编码(空间复用)print("\n[发送3] MIMO 编码(2天线)")def mimo_encode(syms, nt): syms = syms.reshape(-1, nt) return syms.Tmimo_syms = mimo_encode(qam_syms, N_TX)print("天线1符号:", np.round(mimo_syms[0], 3))print("天线2符号:", np.round(mimo_syms[1], 3))# 4. 子载波映射print("\n[发送4] 子载波映射")def subcarrier_map(syms, n_fft): out = np.zeros(n_fft, dtype=complex) out[:len(syms)] = syms return outtx_sc = [subcarrier_map(s, N_FFT) for s in mimo_syms]dump_signal("天线1子载波映射", tx_sc[0])dump_signal("天线2子载波映射", tx_sc[1])# 5. IFFTprint("\n[发送5] IFFT 变换")tx_ifft = [np.fft.ifft(s) for s in tx_sc]dump_signal("天线1 IFFT输出", tx_ifft[0])dump_signal("天线2 IFFT输出", tx_ifft[1])fig_dir = ensure_fig_dir()plot_complex_stage(tx_ifft, "TX5 IFFT Time-Domain", os.path.join(fig_dir, "tx5_ifft.png"))print("图已保存:figures/tx5_ifft.png")# 6. 插入 CPprint("\n[发送6] 插入循环前缀 CP")def add_cp(x, cp_len): return np.concatenate([x[-cp_len:], x])tx_cp = [add_cp(s, N_CP) for s in tx_ifft]dump_signal("天线1 CP后", tx_cp[0])dump_signal("天线2 CP后", tx_cp[1])plot_complex_stage(tx_cp, "TX6 With CP", os.path.join(fig_dir, "tx6_cp.png"))print("图已保存:figures/tx6_cp.png")# 7. FIR 滤波print("\n[发送7] FIR 低通滤波")def fir(x, taps): return np.convolve(x.real, taps, mode='same') + 1j * np.convolve(x.imag, taps, mode='same')tx_fir = [fir(s, FIR_TAPS) for s in tx_cp]dump_signal("天线1 FIR输出", tx_fir[0])dump_signal("天线2 FIR输出", tx_fir[1])plot_complex_stage(tx_fir, "TX7 FIR Output", os.path.join(fig_dir, "tx7_fir.png"))print("图已保存:figures/tx7_fir.png")# 8. D/A 转换(输出实信号)print("\n[发送8] D/A 数模转换(发射波形)")tx_analog = [np.real(s) for s in tx_fir]print("天线1 DA输出前8点:", np.round(tx_analog[0][:8], 3))plot_complex_stage([tx_analog[0].astype(complex), tx_analog[1].astype(complex)], "TX8 D/A Real Waveform", os.path.join(fig_dir, "tx8_da.png"))print("图已保存:figures/tx8_da.png")# -----------------------------------------------------------------------------# 无线信道 + 噪声# -----------------------------------------------------------------------------print("\n" + "="*60)print(" 无线信道传输(加噪 + 衰落)")print("="*60)H = (np.random.randn(N_RX, N_TX) + 1j*np.random.randn(N_RX, N_TX))/np.sqrt(2) # 信道矩阵print("信道矩阵 H:\n", np.round(H, 3))print("信道矩阵条件数 cond(H):", np.round(np.linalg.cond(H), 3))rx_analog = np.zeros((N_RX, len(tx_analog[0])), dtype=complex)for r in range(N_RX): for t in range(N_TX): rx_analog[r] += H[r, t] * tx_analog[t]noise = (np.random.randn(*rx_analog.shape) + 1j*np.random.randn(*rx_analog.shape))/np.sqrt(2*SNR)rx_analog += noisedump_signal("接收端模拟复基带(天线1)", rx_analog[0])dump_signal("接收端模拟复基带(天线2)", rx_analog[1])# -----------------------------------------------------------------------------# ===================== 接收端 RECEIVER =====================# -----------------------------------------------------------------------------print("\n" + "="*80)print(" 接收端信号处理流程")print("="*80)# 1. A/D 模数转换print("\n[接收1] A/D 转换")rx_digital = rx_analog.copy()dump_signal("天线1 A/D输出", rx_digital[0])dump_signal("天线2 A/D输出", rx_digital[1])plot_complex_stage(rx_digital, "RX1 A/D Output", os.path.join(fig_dir, "rx1_ad.png"))print("图已保存:figures/rx1_ad.png")# 2. FIR 低通滤波print("\n[接收2] FIR 滤波")rx_fir = [fir(s, FIR_TAPS) for s in rx_digital]dump_signal("天线1 接收FIR输出", rx_fir[0])dump_signal("天线2 接收FIR输出", rx_fir[1])plot_complex_stage(rx_fir, "RX2 FIR Output", os.path.join(fig_dir, "rx2_fir.png"))print("图已保存:figures/rx2_fir.png")# 3. 去除 CPprint("\n[接收3] 去除 CP")def remove_cp(x, cp_len): return x[cp_len:]rx_nocp = [remove_cp(s, N_CP) for s in rx_fir]dump_signal("天线1 去CP后", rx_nocp[0])dump_signal("天线2 去CP后", rx_nocp[1])plot_complex_stage(rx_nocp, "RX3 Remove CP", os.path.join(fig_dir, "rx3_remove_cp.png"))print("图已保存:figures/rx3_remove_cp.png")# 4. FFT 变换print("\n[接收4] FFT 频域变换")rx_fft = [np.fft.fft(s) for s in rx_nocp]dump_signal("天线1 FFT输出", rx_fft[0])dump_signal("天线2 FFT输出", rx_fft[1])plot_complex_stage(rx_fft, "RX4 FFT Output", os.path.join(fig_dir, "rx4_fft.png"))print("图已保存:figures/rx4_fft.png")# 5. 子载波解映射print("\n[接收5] 子载波解映射")def subcarrier_demap(x, num_sym): return x[:num_sym]num_sym = len(mimo_syms[0])rx_demap = [subcarrier_demap(s, num_sym) for s in rx_fft]dump_signal("天线1 子载波解映射", rx_demap[0])dump_signal("天线2 子载波解映射", rx_demap[1])# 6. MIMO 检测(迫零 ZF)print("\n[接收6] MIMO 检测(ZF 算法)")def mimo_detect(y, H): H_pinv = np.linalg.pinv(H) return H_pinv @ yrx_mimo = mimo_detect(np.array(rx_demap), H)print("检测后符号(天线1):", np.round(rx_mimo[0], 3))# 7. 64QAM 解调print("\n[接收7] 64QAM 解调")def qam64_demod(syms): syms = syms * np.sqrt(42) bits = [] for s in syms: i = np.clip(int(round((s.real + 7)/2)), 0, 7) q = np.clip(int(round((s.imag + 7)/2)), 0, 7) bits += [(i>>2)&1, (i>>1)&1, i&1, (q>>2)&1, (q>>1)&1, q&1] return np.array(bits)demod_bits = qam64_demod(rx_mimo.flatten())print("解调后比特:", demod_bits)# 8. LDPC 信道解码(bit-flipping)print("\n[接收8] LDPC 信道解码(Bit-Flipping)")decoded_bits, decoded_codeword = ldpc_decode_bitflip(demod_bits[:N_LDPC], max_iter=20)syndrome_after = (H_LDPC @ decoded_codeword) % 2print("解码后比特:\t", decoded_bits)print("原始发送比特:\t", bits)print("译码后综合校验 syndrome:", syndrome_after)print("\n✅ 收发端全程仿真完成!")