-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrmldataset2016.py
118 lines (91 loc) · 4 KB
/
rmldataset2016.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import pickle
import numpy as np
from numpy import linalg as la
maxlen=512
def l2_normalize(x, axis=-1):
y = np.max(np.sum(x ** 2, axis, keepdims=True), axis, keepdims=True)
return x / np.sqrt(y)
def norm_pad_zeros(X_train,nsamples):
print ("Pad:", X_train.shape)
for i in range(X_train.shape[0]):
X_train[i,:,0] = X_train[i,:,0]/la.norm(X_train[i,:,0],2)
return X_train
def to_amp_phase(X_train,X_val,X_test,nsamples):
X_train_cmplx = X_train[:,0,:] + 1j* X_train[:,1,:]
X_val_cmplx = X_val[:,0,:] + 1j* X_val[:,1,:]
X_test_cmplx = X_test[:,0,:] + 1j* X_test[:,1,:]
X_train_amp = np.abs(X_train_cmplx)
X_train_ang = np.arctan2(X_train[:,1,:],X_train[:,0,:])/np.pi
X_train_amp = np.reshape(X_train_amp,(-1,1,nsamples))
X_train_ang = np.reshape(X_train_ang,(-1,1,nsamples))
X_train = np.concatenate((X_train_amp,X_train_ang), axis=1)
X_train = np.transpose(np.array(X_train),(0,2,1))
X_val_amp = np.abs(X_val_cmplx)
X_val_ang = np.arctan2(X_val[:,1,:],X_val[:,0,:])/np.pi
X_val_amp = np.reshape(X_val_amp,(-1,1,nsamples))
X_val_ang = np.reshape(X_val_ang,(-1,1,nsamples))
X_val = np.concatenate((X_val_amp,X_val_ang), axis=1)
X_val = np.transpose(np.array(X_val),(0,2,1))
X_test_amp = np.abs(X_test_cmplx)
X_test_ang = np.arctan2(X_test[:,1,:],X_test[:,0,:])/np.pi
X_test_amp = np.reshape(X_test_amp,(-1,1,nsamples))
X_test_ang = np.reshape(X_test_ang,(-1,1,nsamples))
X_test = np.concatenate((X_test_amp,X_test_ang), axis=1)
X_test = np.transpose(np.array(X_test),(0,2,1))
return (X_train,X_val,X_test)
def load_data(filename=r'D:\HECHAOWEI\data\RML2016.10a_dict_512(1).dat'):
Xd =pickle.load(open(filename,'rb'),encoding='iso-8859-1')
mods,snrs = [sorted(list(set([k[j] for k in Xd.keys()]))) for j in [0,1] ] #mods['8PSK', 'AM-DSB', 'BPSK', 'CPFSK', 'GFSK', 'PAM4', 'QAM16', 'QAM64', 'QPSK', 'WBFM']
X = []
lbl = []
train_idx=[]
val_idx=[]
test_idx=[]
np.random.seed(2016)
a=0
for mod in mods:
for snr in snrs:
X.append(Xd[(mod, snr)])
for i in range(Xd[(mod, snr)].shape[0]):
lbl.append((mod, snr))
if(snr >= -12):
train_idx += list(np.random.choice(range(a * 1000, (a + 1) * 1000), size=600, replace=False))
val_idx += list(
np.random.choice(list(set(range(a * 1000, (a + 1) * 1000)) - set(train_idx)), size=200, replace=False))
test_idx += list(set(range(a * 1000, (a + 1) * 1000)) - set(train_idx) - set(val_idx))
else:
test_idx += list(np.random.choice(range(a * 1000, (a + 1) * 1000), size=200, replace=False))
a += 1
X = np.vstack(X)
np.random.shuffle(train_idx)
np.random.shuffle(val_idx)
np.random.shuffle(test_idx)
X_train = X[train_idx]
X_val=X[val_idx]
X_test = X[test_idx]
def to_onehot(yy):
yy1 = np.zeros([len(yy), len(mods)])
yy1[np.arange(len(yy)), yy] = 1
return yy1
Y_train = to_onehot(list(map(lambda x: mods.index(lbl[x][0]), train_idx)))
Y_val=to_onehot(list(map(lambda x: mods.index(lbl[x][0]), val_idx)))
Y_test = to_onehot(list(map(lambda x: mods.index(lbl[x][0]),test_idx)))
X_train,X_val,X_test = to_amp_phase(X_train,X_val,X_test,512)
print(X_train)
X_train = X_train[:,:maxlen,:]
X_val = X_val[:,:maxlen,:]
X_test = X_test[:,:maxlen,:]
X_train = norm_pad_zeros(X_train,maxlen)
X_val = norm_pad_zeros(X_val,maxlen)
X_test = norm_pad_zeros(X_test,maxlen)
print(X_train)
print(X_train.shape)
print(X_val.shape)
print(X_test.shape)
print(Y_train.shape)
print(Y_val.shape)
print(Y_test.shape)
return (mods,snrs,lbl),(X_train,Y_train),(X_val,Y_val),(X_test,Y_test),(train_idx,val_idx,test_idx)
if __name__ == '__main__':
(mods, snrs, lbl), (X_train, Y_train),(X_val,Y_val), (X_test, Y_test), (train_idx,val_idx,test_idx) = load_data()