MENU

手写AI

May 6, 2022 • 梦想是做一个技术宅

1. 数据加载

my_dataset_dataloader.py

"""
目的是把之前的__next__功能抽离出来
新建一个类,里面放__next__方法
"""

import random
import numpy as np


class MyDataset:
    def __init__(self, all_datas, batch_size, shuffle=True):
        self.all_datas = all_datas
        self.batch_size = batch_size
        self.shuffle = shuffle
    # 在for循环中首次执行
    
    def __iter__(self):  # 返回一个具有__next__的对象
        return Dataloader(self)  # 把self作为dataset传入对象中

    def __len__(self):
        return len(self.all_datas)


class Dataloader:
    def __init__(self, dataset):
        self.dataset = dataset
        # 根据长度生成一个索引,shuffle是打乱索引,而不是list重新排序
        self.index = [i for i in range(len(self.dataset))]
        if self.dataset.shuffle:
            np.random.shuffle(self.index)
        self.cursor = 0

    def __next__(self):
        if self.cursor >= len(self.dataset.all_datas):
            raise StopIteration
        # 将索引拿出来
        index = self.index[self.cursor: self.cursor + self.dataset.batch_size]
        # 根据索引取batch_data
        batch_data = self.dataset.all_datas[index]
        self.cursor += self.dataset.batch_size
        return batch_data


if __name__ == "__main__":
    all_datas = np.array([1, 2, 3, 4, 5, 6, 7])
    batch_size = 2
    shuffle = True
    epoc = 2
    dataset = MyDataset(all_datas, batch_size, shuffle)
    for e in range(epoc):
        for batch_data in dataset:  # 把一个对象放在for上面,会自动调用这个对象的__iter__方法(仅仅首次执行)
            # print(type(batch_data))
            print(batch_data)
            

2. 多特征 - 线性回归

"""
线性回归的矩阵实现,即多个属性用矩阵表示
"""

import numpy as np
import pandas as pd
from tqdm import tqdm
import pickle


def get_data(file = "上海二手房价.csv"):
    datas = pd.read_csv(file, names=['y', 'x1', 'x2', 'x3', 'x4', 'x5', 'x6'], skiprows=1)

    y = datas['y'].values.reshape(-1, 1)  # 变成纵向的向量
    X = datas[[f'x{i}' for i in range(1, 7)]].values

    # 归一化 z-score: (X - mean_x) / std
    mean_y = np.mean(y)
    std_y = np.std(y)

    mean_X = np.mean(X, axis=0, keepdims=True)
    std_X = np.std(X, axis=0, keepdims=True)

    y = (y - mean_y) / std_y
    X = (X - mean_X) / std_X

    return X, y, mean_y, std_y, mean_X, std_X


if __name__ == '__main__':
    X, y, mean_y, std_y, mean_X, std_X = get_data()
    K = np.random.random((6, 1))  # 生成k的初始值,长度为6的列向量
    epoc = 1000
    """
    梯度爆炸的原因:
    """
    lr = 0.05
    b = 1

    for e in range(epoc):
        pre = X @ K + b
        loss = np.sum((pre - y) ** 2) / len(X)

        # 梯度计算部分
        # G = 2 * (pre - y)  # 是一个长度为6的向量
        G = (pre - y) / len(X)  # 去掉梯度的常数项,并且除以长度,防止梯度爆照
        delta_K = X.T @ G
        delta_b = np.sum(G)  # 偏置项的梯度就是G

        K = K - lr * delta_K  # 迭代K
        b = b - lr * delta_b

        print(loss)

    # while True:
    #     bedroom = (int(input('请输入卧室数量:')))
    #     ting = (int(input('请输入客厅餐厅数量:')))
    #     wei = (int(input('请输入卫生间数量:')))
    #     area = (int(input('请输入面积:')))
    #     floor = (int(input('请输入楼层:')))
    #     year = (int(input('请输入建成年份:')))
    #
    #     text_x = (np.array([bedroom, ting, wei, area, floor, year]).reshape(1, -1) - mean_X) / std_X
    #
    #     p = text_x @ K
    #     print(f'房价为{p + std_y + mean_y}')

3. 逻辑回归

"""
激活函数,目的是把pre映射到一个目标区间内
"""

import numpy as np


def sigmoid(x):
    return 1 / (1 + np.exp(-x))


if __name__ == '__main__':
    # ---------------毛发长,腿长
    dogs = np.array([[8.9, 12], [9, 11], [10, 13], [9.9, 11.2], [12.2, 10.1], [9.8, 13], [8.8, 11.2]])  # 0
    cats = np.array([[3, 4], [5, 6], [3.5, 5.5], [4.5, 5.1], [3.4, 4.1], [4.1, 5.2], [4.4, 4.4]])  # 1

    labels = np.array([0] * len(dogs) + [1] * len(cats), dtype=np.int32).reshape(-1, 1)

    # X = np.concatenate((dogs, cats), axis=0)
    X = np.vstack((dogs, cats))  # 纵向合并

    K = np.random.normal(0, 1, size=(2, 1))
    b = 0
    epoc = 1000
    lr = 0.05

    for e in range(epoc):
        p = X @ K + b
        pre = sigmoid(p)
        # log中的值是0~1的值,所以结果是负数,加负号
        loss = -np.mean(labels * np.log(pre) + (1 - labels) * np.log(1 - pre))

        G = pre - labels  # G为loss对pre的导数
        delta_K = X.T @ G
        delta_b = np.sum(G)

        K -= lr * delta_K
        b -= lr * delta_b
        # print(loss)

    while True:
        f1 = float(input('请输入毛发长:'))
        f2 = float(input('请输入腿长:'))
        text_x = np.array([f1, f2]).reshape(1, 2)
        p = sigmoid(text_x @ K + b)
        if p > 0.5:
            print('类别:猫')
        else:
            print('类别:狗')

4. BP神经网络

import numpy as np
import struct
import matplotlib.pyplot as plt


def load_labels(file):  # 加载标签
    with open(file, 'rb') as f:
        data = f.read()
    return np.asanyarray(bytearray(data[8:]), dtype=np.int32)


def load_images(file):  # 加载图片
    with open(file, 'rb') as f:
        data = f.read()
        magic_number, num_items, rows, cols = struct.unpack('>iiii', data[: 16])
        return np.asanyarray(bytearray(data[16:]), dtype=np.uint8).reshape(num_items, -1)


def make_one_hot(labels, class_num=10):  # 独热编码
    result = np.zeros((len(labels), class_num))
    for index, label in enumerate(labels):
        result[index][label] = 1
    return result


def sigmoid(x):
    return 1 / (1 + np.exp(-x))


def softmax(x):
    return np.exp(x) / np.sum(np.exp(x), axis=1, keepdims=True)


if __name__ == '__main__':
    train_datas = load_images(r'data\train-images.idx3-ubyte') / 255  # 数据归一化,图片像素大小是0~255
    train_labels = load_labels(r'data\train-labels.idx1-ubyte')
    train_labels = make_one_hot(train_labels)  # 变化为独热码

    test_datas = load_images(r'data\t10k-images.idx3-ubyte') / 255
    test_labels = load_labels(r'data\t10k-labels.idx1-ubyte')

    epoc = 20
    batch_size = 200
    lr = 0.1

    hidden_num = 256
    w1 = np.random.normal(0, 1, size=(train_datas.shape[1], hidden_num))
    w2 = np.random.normal(0, 1, size=(hidden_num, 10))

    batch_times = int(np.ceil(len(train_datas) / batch_size))  # 向上取整,然后化为整数
    for e in range(epoc):
        for batch_index in range(batch_times):
            # ---------- 获取数据 -------------
            batch_x = train_datas[batch_index * batch_size: (batch_index + 1) * batch_size]
            batch_label = train_labels[batch_index * batch_size: (batch_index + 1) * batch_size]

            # ---------- forward -------------
            h = batch_x @ w1
            sig_h = sigmoid(h)
            p = sig_h @ w2
            pre = softmax(p)

            # ---------- 计算loss -------------
            loss = -np.sum(batch_label * np.log(pre)) / batch_size  # 这里是计算的平均loss

            # ---------- backward -------------
            """
            当batch_size很大的时候,G2会带动下面所有的数值,导致梯度爆炸
            理论上,batch_size越大,数据运行越快,每个epoc下降速度会越慢
            """
            G2 = (pre - batch_label) / batch_size  # 防止因batch_size太大时候导致的梯度爆炸
            delta_w2 = sig_h.T @ G2
            delta_sig_h = G2 @ w2.T
            delta_h = delta_sig_h * sig_h * (1 - sig_h)
            delta_w1 = batch_x.T @ delta_h

            # ---------- 更新梯度 -------------
            w1 -= lr * delta_w1
            w2 -= lr * delta_w2

        # 计算精确度
        h = test_datas @ w1
        sig_h = sigmoid(h)
        p = sig_h @ w2
        pre = softmax(p)
        pre = np.argmax(pre, axis=1)
        acc = np.sum(pre == test_labels) / 10000
        print(acc)

5. 卷积

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os


def get_imgs(path:str):
    # 读入图片
    img_files = os.listdir(path)
    result = []
    for file in img_files:
        file = os.path.join(path, file)
        img = cv2.imread(file)
        img = cv2.resize(img, (150, 150))
        img = img.transpose(2, 0, 1)
        result.append(img)
    return np.array(result)


def conv(imgs, kernel):
    out_channel, in_channel, kernel_h, kernel_w = kernel.shape
    img_num, _, img_h, img_w = imgs.shape

    # 卷积之后的宽和高
    c_w = img_w - kernel_w + 1  # width
    c_h = img_h - kernel_h + 1  # high

    A = kernel.reshape(out_channel, -1)
    B = np.zeros((img_num, A.shape[1], c_w * c_h))

    c = 0
    for h in range(c_h):  # 后从上到下
        for w in range(c_w):  # 先从左到右
            d = imgs[:, :, h: h+kernel_h, w: w+kernel_w]
            d = d.reshape(img_num, -1)  # 假如是(1, -1)则会变成二维的,与下面切片出来的一维不匹配
            B[:, :, c] = d
            c += 1
    result = A @ B
    result = result.reshape(img_num, out_channel, c_h, c_w)
    return result


if __name__ == '__main__':
    kernel = np.array(
        [
            [
                [
                    [-1, -2, -3],
                    [-1, -2, -3],
                    [-1, -10, 1]
                ],
                [
                    [0, 3, 3],
                    [-1, -2, -3],
                    [1, 1, 1]
                ],
                [
                    [3, 3, 3],
                    [-1, -9, -3],
                    [1, 1, 1]
                ]
            ],
            [
                [
                    [1, -1, 0],
                    [1, -1, 0],
                    [1, -1, 0]
                ],
                [
                    [1, -1, 0],
                    [1, -1, 0],
                    [1, -1, 0]
                ],
                [
                    [1, -1, 0],
                    [1, -1, 0],
                    [1, -1, 0]
                ]
            ]
        ]
    )
    imgs = get_imgs('img')
    result = conv(imgs, kernel)
    for i in result:
        for j in i:
            plt.imshow(j, cmap='gray')
            plt.show()

6. 封装

import numpy as np
import struct
import matplotlib.pyplot as plt


def load_labels(file):  # 加载标签
    with open(file, 'rb') as f:
        data = f.read()
    return np.asanyarray(bytearray(data[8:]), dtype=np.int32)


def load_images(file):  # 加载图片
    with open(file, 'rb') as f:
        data = f.read()
        magic_number, num_items, rows, cols = struct.unpack('>iiii', data[: 16])
        return np.asanyarray(bytearray(data[16:]), dtype=np.uint8).reshape(num_items, -1)


def make_one_hot(labels, class_num=10):  # 独热编码
    result = np.zeros((len(labels), class_num))
    for index, label in enumerate(labels):
        result[index][label] = 1
    return result


def sigmoid(x):
    return 1 / (1 + np.exp(-x))


def softmax(x):
    return np.exp(x) / np.sum(np.exp(x), axis=1, keepdims=True)


def get_datas():
    train_datas = load_images(r'..\05_01_05BP神经网络\data\train-images.idx3-ubyte') / 255  # 数据归一化,图片像素大小是0~255
    train_labels = load_labels(r'..\05_01_05BP神经网络\data\train-labels.idx1-ubyte')
    train_labels = make_one_hot(train_labels)  # 变化为独热码
    test_datas = load_images(r'..\05_01_05BP神经网络\data\t10k-images.idx3-ubyte') / 255
    test_labels = load_labels(r'..\05_01_05BP神经网络\data\t10k-labels.idx1-ubyte')
    return train_datas, train_labels, test_datas, test_labels


class Linear:  # 用linear层替换w1和w2
    def __init__(self, in_num, out_num):
        self.weight = np.random.normal(0, 1, size=(in_num, out_num))

    def __call__(self, x):  # 对象+括号 就会调用__call__函数
        return self.forward(x)

    def forward(self, x):
        self.x = x
        return x @ self.weight

    def backward(self, G):
        delta_weight = self.x.T @ G
        self.weight -= lr * delta_weight  # 优化器的内容,梯度下降优化器SGD
        delta_x = G @ self.weight.T
        return delta_x


class Sigmoid:
    def __call__(self, x):
        return self.forward(x)

    def forward(self, x):
        self.r = sigmoid(x)
        return self.r

    def backward(self, G):
        return G * self.r * (1 - self.r)


class Softmax:
    def __call__(self, x):
        return self.forward(x)

    def forward(self, x):
        self.r = softmax(x)
        return self.r

    def backward(self, G):  # G传入label的值
        return (self.r - G) / self.r.shape[0]


class Mymodel:
    def __init__(self, layers):
        self.layers = layers

    def forward(self, x, label=None):
        for layer in self.layers:
            x = layer(x)
        self.x = x
        if label is not None:
            self.label = label
            loss = -np.sum(label * np.log(x)) / x.shape[0]
            return loss

    def backward(self):
        G = self.label
        for layer in self.layers[::-1]:
            G = layer.backward(G)

    def __call__(self, *args):  # *args表示未命名变参,**kwargs表示命名变参
        return self.forward(*args)


if __name__ == '__main__':
    # 获取数据
    train_datas, train_labels, test_datas, test_labels = get_datas()

    epoch = 20
    batch_size = 200
    lr = 0.1
    hidden_num = 256

    model = Mymodel(
        [
            Linear(train_datas.shape[1], hidden_num),
            Sigmoid(),
            Linear(hidden_num, 10),
            Softmax()
        ]
    )

    batch_times = int(np.ceil(len(train_datas) / batch_size))  # 向上取整,然后化为整数
    for e in range(epoch):
        for batch_index in range(batch_times):
            # ---------- 获取数据 -------------
            x = train_datas[batch_index * batch_size: (batch_index + 1) * batch_size]
            batch_label = train_labels[batch_index * batch_size: (batch_index + 1) * batch_size]

            # ---------- forward -------------
            loss = model(x, batch_label)
            if batch_index % 100 == 0:
                print(f'loss: {loss:.3f}')

            # ---------- backward -------------
            model.backward()

        # 计算精确度
        x = test_datas
        model(x)
        pre = np.argmax(model.x, axis=1)
        acc = np.sum(pre == test_labels) / 10000
        print(f'--------------acc: {acc:.3f}')