1. 数据加载
my_dataset_dataloader.py
"""
目的是把之前的__next__功能抽离出来
新建一个类,里面放__next__方法
"""
import random
import numpy as np
class MyDataset:
def __init__(self, all_datas, batch_size, shuffle=True):
self.all_datas = all_datas
self.batch_size = batch_size
self.shuffle = shuffle
# 在for循环中首次执行
def __iter__(self): # 返回一个具有__next__的对象
return Dataloader(self) # 把self作为dataset传入对象中
def __len__(self):
return len(self.all_datas)
class Dataloader:
def __init__(self, dataset):
self.dataset = dataset
# 根据长度生成一个索引,shuffle是打乱索引,而不是list重新排序
self.index = [i for i in range(len(self.dataset))]
if self.dataset.shuffle:
np.random.shuffle(self.index)
self.cursor = 0
def __next__(self):
if self.cursor >= len(self.dataset.all_datas):
raise StopIteration
# 将索引拿出来
index = self.index[self.cursor: self.cursor + self.dataset.batch_size]
# 根据索引取batch_data
batch_data = self.dataset.all_datas[index]
self.cursor += self.dataset.batch_size
return batch_data
if __name__ == "__main__":
all_datas = np.array([1, 2, 3, 4, 5, 6, 7])
batch_size = 2
shuffle = True
epoc = 2
dataset = MyDataset(all_datas, batch_size, shuffle)
for e in range(epoc):
for batch_data in dataset: # 把一个对象放在for上面,会自动调用这个对象的__iter__方法(仅仅首次执行)
# print(type(batch_data))
print(batch_data)
2. 多特征 - 线性回归
"""
线性回归的矩阵实现,即多个属性用矩阵表示
"""
import numpy as np
import pandas as pd
from tqdm import tqdm
import pickle
def get_data(file = "上海二手房价.csv"):
datas = pd.read_csv(file, names=['y', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6'], skiprows=1)
y = datas['y'].values.reshape(-1, 1) # 变成纵向的向量
X = datas[[f'x{i}' for i in range(1, 7)]].values
# 归一化 z-score: (X - mean_x) / std
mean_y = np.mean(y)
std_y = np.std(y)
mean_X = np.mean(X, axis=0, keepdims=True)
std_X = np.std(X, axis=0, keepdims=True)
y = (y - mean_y) / std_y
X = (X - mean_X) / std_X
return X, y, mean_y, std_y, mean_X, std_X
if __name__ == '__main__':
X, y, mean_y, std_y, mean_X, std_X = get_data()
K = np.random.random((6, 1)) # 生成k的初始值,长度为6的列向量
epoc = 1000
"""
梯度爆炸的原因:
"""
lr = 0.05
b = 1
for e in range(epoc):
pre = X @ K + b
loss = np.sum((pre - y) ** 2) / len(X)
# 梯度计算部分
# G = 2 * (pre - y) # 是一个长度为6的向量
G = (pre - y) / len(X) # 去掉梯度的常数项,并且除以长度,防止梯度爆照
delta_K = X.T @ G
delta_b = np.sum(G) # 偏置项的梯度就是G
K = K - lr * delta_K # 迭代K
b = b - lr * delta_b
print(loss)
# while True:
# bedroom = (int(input('请输入卧室数量:')))
# ting = (int(input('请输入客厅餐厅数量:')))
# wei = (int(input('请输入卫生间数量:')))
# area = (int(input('请输入面积:')))
# floor = (int(input('请输入楼层:')))
# year = (int(input('请输入建成年份:')))
#
# text_x = (np.array([bedroom, ting, wei, area, floor, year]).reshape(1, -1) - mean_X) / std_X
#
# p = text_x @ K
# print(f'房价为{p + std_y + mean_y}')
3. 逻辑回归
"""
激活函数,目的是把pre映射到一个目标区间内
"""
import numpy as np
def sigmoid(x):
return 1 / (1 + np.exp(-x))
if __name__ == '__main__':
# ---------------毛发长,腿长
dogs = np.array([[8.9, 12], [9, 11], [10, 13], [9.9, 11.2], [12.2, 10.1], [9.8, 13], [8.8, 11.2]]) # 0
cats = np.array([[3, 4], [5, 6], [3.5, 5.5], [4.5, 5.1], [3.4, 4.1], [4.1, 5.2], [4.4, 4.4]]) # 1
labels = np.array([0] * len(dogs) + [1] * len(cats), dtype=np.int32).reshape(-1, 1)
# X = np.concatenate((dogs, cats), axis=0)
X = np.vstack((dogs, cats)) # 纵向合并
K = np.random.normal(0, 1, size=(2, 1))
b = 0
epoc = 1000
lr = 0.05
for e in range(epoc):
p = X @ K + b
pre = sigmoid(p)
# log中的值是0~1的值,所以结果是负数,加负号
loss = -np.mean(labels * np.log(pre) + (1 - labels) * np.log(1 - pre))
G = pre - labels # G为loss对pre的导数
delta_K = X.T @ G
delta_b = np.sum(G)
K -= lr * delta_K
b -= lr * delta_b
# print(loss)
while True:
f1 = float(input('请输入毛发长:'))
f2 = float(input('请输入腿长:'))
text_x = np.array([f1, f2]).reshape(1, 2)
p = sigmoid(text_x @ K + b)
if p > 0.5:
print('类别:猫')
else:
print('类别:狗')
4. BP神经网络
import numpy as np
import struct
import matplotlib.pyplot as plt
def load_labels(file): # 加载标签
with open(file, 'rb') as f:
data = f.read()
return np.asanyarray(bytearray(data[8:]), dtype=np.int32)
def load_images(file): # 加载图片
with open(file, 'rb') as f:
data = f.read()
magic_number, num_items, rows, cols = struct.unpack('>iiii', data[: 16])
return np.asanyarray(bytearray(data[16:]), dtype=np.uint8).reshape(num_items, -1)
def make_one_hot(labels, class_num=10): # 独热编码
result = np.zeros((len(labels), class_num))
for index, label in enumerate(labels):
result[index][label] = 1
return result
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def softmax(x):
return np.exp(x) / np.sum(np.exp(x), axis=1, keepdims=True)
if __name__ == '__main__':
train_datas = load_images(r'data\train-images.idx3-ubyte') / 255 # 数据归一化,图片像素大小是0~255
train_labels = load_labels(r'data\train-labels.idx1-ubyte')
train_labels = make_one_hot(train_labels) # 变化为独热码
test_datas = load_images(r'data\t10k-images.idx3-ubyte') / 255
test_labels = load_labels(r'data\t10k-labels.idx1-ubyte')
epoc = 20
batch_size = 200
lr = 0.1
hidden_num = 256
w1 = np.random.normal(0, 1, size=(train_datas.shape[1], hidden_num))
w2 = np.random.normal(0, 1, size=(hidden_num, 10))
batch_times = int(np.ceil(len(train_datas) / batch_size)) # 向上取整,然后化为整数
for e in range(epoc):
for batch_index in range(batch_times):
# ---------- 获取数据 -------------
batch_x = train_datas[batch_index * batch_size: (batch_index + 1) * batch_size]
batch_label = train_labels[batch_index * batch_size: (batch_index + 1) * batch_size]
# ---------- forward -------------
h = batch_x @ w1
sig_h = sigmoid(h)
p = sig_h @ w2
pre = softmax(p)
# ---------- 计算loss -------------
loss = -np.sum(batch_label * np.log(pre)) / batch_size # 这里是计算的平均loss
# ---------- backward -------------
"""
当batch_size很大的时候,G2会带动下面所有的数值,导致梯度爆炸
理论上,batch_size越大,数据运行越快,每个epoc下降速度会越慢
"""
G2 = (pre - batch_label) / batch_size # 防止因batch_size太大时候导致的梯度爆炸
delta_w2 = sig_h.T @ G2
delta_sig_h = G2 @ w2.T
delta_h = delta_sig_h * sig_h * (1 - sig_h)
delta_w1 = batch_x.T @ delta_h
# ---------- 更新梯度 -------------
w1 -= lr * delta_w1
w2 -= lr * delta_w2
# 计算精确度
h = test_datas @ w1
sig_h = sigmoid(h)
p = sig_h @ w2
pre = softmax(p)
pre = np.argmax(pre, axis=1)
acc = np.sum(pre == test_labels) / 10000
print(acc)
5. 卷积
import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
def get_imgs(path:str):
# 读入图片
img_files = os.listdir(path)
result = []
for file in img_files:
file = os.path.join(path, file)
img = cv2.imread(file)
img = cv2.resize(img, (150, 150))
img = img.transpose(2, 0, 1)
result.append(img)
return np.array(result)
def conv(imgs, kernel):
out_channel, in_channel, kernel_h, kernel_w = kernel.shape
img_num, _, img_h, img_w = imgs.shape
# 卷积之后的宽和高
c_w = img_w - kernel_w + 1 # width
c_h = img_h - kernel_h + 1 # high
A = kernel.reshape(out_channel, -1)
B = np.zeros((img_num, A.shape[1], c_w * c_h))
c = 0
for h in range(c_h): # 后从上到下
for w in range(c_w): # 先从左到右
d = imgs[:, :, h: h+kernel_h, w: w+kernel_w]
d = d.reshape(img_num, -1) # 假如是(1, -1)则会变成二维的,与下面切片出来的一维不匹配
B[:, :, c] = d
c += 1
result = A @ B
result = result.reshape(img_num, out_channel, c_h, c_w)
return result
if __name__ == '__main__':
kernel = np.array(
[
[
[
[-1, -2, -3],
[-1, -2, -3],
[-1, -10, 1]
],
[
[0, 3, 3],
[-1, -2, -3],
[1, 1, 1]
],
[
[3, 3, 3],
[-1, -9, -3],
[1, 1, 1]
]
],
[
[
[1, -1, 0],
[1, -1, 0],
[1, -1, 0]
],
[
[1, -1, 0],
[1, -1, 0],
[1, -1, 0]
],
[
[1, -1, 0],
[1, -1, 0],
[1, -1, 0]
]
]
]
)
imgs = get_imgs('img')
result = conv(imgs, kernel)
for i in result:
for j in i:
plt.imshow(j, cmap='gray')
plt.show()
6. 封装
import numpy as np
import struct
import matplotlib.pyplot as plt
def load_labels(file): # 加载标签
with open(file, 'rb') as f:
data = f.read()
return np.asanyarray(bytearray(data[8:]), dtype=np.int32)
def load_images(file): # 加载图片
with open(file, 'rb') as f:
data = f.read()
magic_number, num_items, rows, cols = struct.unpack('>iiii', data[: 16])
return np.asanyarray(bytearray(data[16:]), dtype=np.uint8).reshape(num_items, -1)
def make_one_hot(labels, class_num=10): # 独热编码
result = np.zeros((len(labels), class_num))
for index, label in enumerate(labels):
result[index][label] = 1
return result
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def softmax(x):
return np.exp(x) / np.sum(np.exp(x), axis=1, keepdims=True)
def get_datas():
train_datas = load_images(r'..\05_01_05BP神经网络\data\train-images.idx3-ubyte') / 255 # 数据归一化,图片像素大小是0~255
train_labels = load_labels(r'..\05_01_05BP神经网络\data\train-labels.idx1-ubyte')
train_labels = make_one_hot(train_labels) # 变化为独热码
test_datas = load_images(r'..\05_01_05BP神经网络\data\t10k-images.idx3-ubyte') / 255
test_labels = load_labels(r'..\05_01_05BP神经网络\data\t10k-labels.idx1-ubyte')
return train_datas, train_labels, test_datas, test_labels
class Linear: # 用linear层替换w1和w2
def __init__(self, in_num, out_num):
self.weight = np.random.normal(0, 1, size=(in_num, out_num))
def __call__(self, x): # 对象+括号 就会调用__call__函数
return self.forward(x)
def forward(self, x):
self.x = x
return x @ self.weight
def backward(self, G):
delta_weight = self.x.T @ G
self.weight -= lr * delta_weight # 优化器的内容,梯度下降优化器SGD
delta_x = G @ self.weight.T
return delta_x
class Sigmoid:
def __call__(self, x):
return self.forward(x)
def forward(self, x):
self.r = sigmoid(x)
return self.r
def backward(self, G):
return G * self.r * (1 - self.r)
class Softmax:
def __call__(self, x):
return self.forward(x)
def forward(self, x):
self.r = softmax(x)
return self.r
def backward(self, G): # G传入label的值
return (self.r - G) / self.r.shape[0]
class Mymodel:
def __init__(self, layers):
self.layers = layers
def forward(self, x, label=None):
for layer in self.layers:
x = layer(x)
self.x = x
if label is not None:
self.label = label
loss = -np.sum(label * np.log(x)) / x.shape[0]
return loss
def backward(self):
G = self.label
for layer in self.layers[::-1]:
G = layer.backward(G)
def __call__(self, *args): # *args表示未命名变参,**kwargs表示命名变参
return self.forward(*args)
if __name__ == '__main__':
# 获取数据
train_datas, train_labels, test_datas, test_labels = get_datas()
epoch = 20
batch_size = 200
lr = 0.1
hidden_num = 256
model = Mymodel(
[
Linear(train_datas.shape[1], hidden_num),
Sigmoid(),
Linear(hidden_num, 10),
Softmax()
]
)
batch_times = int(np.ceil(len(train_datas) / batch_size)) # 向上取整,然后化为整数
for e in range(epoch):
for batch_index in range(batch_times):
# ---------- 获取数据 -------------
x = train_datas[batch_index * batch_size: (batch_index + 1) * batch_size]
batch_label = train_labels[batch_index * batch_size: (batch_index + 1) * batch_size]
# ---------- forward -------------
loss = model(x, batch_label)
if batch_index % 100 == 0:
print(f'loss: {loss:.3f}')
# ---------- backward -------------
model.backward()
# 计算精确度
x = test_datas
model(x)
pre = np.argmax(model.x, axis=1)
acc = np.sum(pre == test_labels) / 10000
print(f'--------------acc: {acc:.3f}')