加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
linear_regression.py 6.35 KB
一键复制 编辑 原始数据 按行查看 历史
星河滚烫兮 提交于 2021-09-04 20:05 . add code
import pandas as pd
import numpy as np
import random
import matplotlib.pyplot as plt
class Unary_Linear_Regression:
"""
一元线性回归模型: y = ax + b
"""
def __init__(self, x, y):
# 定义机器学习设置参数
self.EPOCH = 200 # 迭代次数
self.Batch = 3 # 每批选取的样本数
self.learning_rate = 0.1 # 学习率α
self.train_num = 0 # 训练集数量
self.test_num = 0 # 测试集数量
# 标准化参数
self.x_mean = 0 # 变量x的平均值
self.x_std = 0 # 变量x的标准差
self.y_mean = 0 # 变量y的平均值
self.y_std = 0 # 变量y的标准差
# 参数a,b
self.a = 0 # 初始化为0
self.b = 0
# 数据集
self.x = x
self.y = y
self.train_x, self.train_y, self.test_x, self.test_y = self.read_dataset()
def read_dataset(self):
"""
处理数据集默认数据集是干净的,不存在空值nan或字符串类型,若有,请修改函数或修改数据集
:return: x, y
"""
# 设置训练集测试集大小
self.train_num = int(len(self.x)*0.8) # 训练集用70%数据
self.test_num = len(self.x) - self.train_num # 测试集用30%数据
# 计算标准化参数
self.x_mean = self.x.mean()
self.x_std = self.x.std()
self.y_mean = self.y.mean()
self.y_std = self.y.std()
# 合并离散点
dataset = []
for i in range(len(self.x)):
dataset.append([self.x[i], self.y[i]])
# 打乱
random.shuffle(dataset)
new_x, new_y = [], []
# 拆分
for i in range(len(dataset)):
new_x.append(dataset[i][0])
new_y.append(dataset[i][1])
new_x = np.array(new_x)
new_y = np.array(new_y)
return new_x[:self.train_num+1], new_y[:self.train_num+1], new_x[self.train_num+1:], new_y[self.train_num+1:]
def z_score(self, name, t):
# Z-score标准化
if name == 'x':
t = (t-self.x_mean) / self.x_std
elif name == 'y':
t = (t-self.y_mean) / self.y_std
return t
def inverse_z_score(self, name, t):
# 逆Z-score-将标准值还原为真实值
if name == 'x':
t = t*self.x_std + self.x_mean
elif name == 'y':
t = t*self.y_std + self.y_mean
return t
def h(self, x):
# 预测函数h(x)
return self.a * x + self.b
def j(self, h, y):
# 代价(损失)函数J(β1,β2)
return (1/(2 * self.Batch)) * np.sum((h - y) ** 2)
def p(self, x, y, h): # 梯度下降算法更新参数值a,b
# 计算梯度
u1 = (1 / self.Batch) * np.sum(x * (h - y)) # 同时计算代价函数J(β1,β2)的偏导数
u2 = (1 / self.Batch) * np.sum(h - y)
# 更新参数
self.a = self.a - self.learning_rate * u1 # 同时更新参数a,b
self.b = self.b - self.learning_rate * u2
def restore(self):
self.b = self.y_mean + self.b * self.y_std - (self.a * self.x_mean * self.y_std / self.x_std)
self.a = (self.a * self.y_std) / self.x_std # 注意此处,应该先更新b值, 否则逆变换导致a值失真!!!
def train(self): # 训练数据集
for i in range(self.EPOCH): # 迭代次数(训练轮数)
for k in range(int(self.train_num / self.Batch)): # 将数据集划分为n个样本,k为当前样本序号
# 从数据集中获取第j个样本
data_x = self.train_x[k*self.Batch:k*self.Batch+self.Batch]
data_y = self.train_y[k*self.Batch:k*self.Batch+self.Batch]
# z-score标准化
data_x = self.z_score('x', data_x)
data_y = self.z_score('y', data_y)
# 线性回归
h = self.h(data_x) # 计算预测函数
j = self.j(h, data_y) # 计算代价函数
self.p(data_x, data_y, h) # 梯度下降完成回归
print(f"a:{self.a}, b:{self.b}, j = {j}")
def test(self): # 测试数据集
# 返回损失值
j = 0
data_x = self.z_score('x', self.test_x)
data_y = self.z_score('y', self.test_y)
h = self.h(data_x)
for i in range(len(h)):
j += (h[i] - data_y[i]) ** 2
return j/self.test_num
class Mult_Linear_Regression:
"""
多元线性回归模型: y = a0x0 + b1x1 + b2x2 + ... + bnxn 其中,x0=1
x:二维矩阵(m,n)(单样本特征排列成行向量),需进行转置为(n,m)
y:二维矩阵(m,1)
"""
def __init__(self, x, y):
# 定义机器学习设置参数
self.EPOCH = 500 # 迭代次数
self.learning_rate = 0.1 # 学习率α
# 数据集
self.x = np.insert(x.T, 0, 1, axis=0) # 处理后的二维特征输入,每个样本为列向量,同时加入偏置项x0=1
self.y = y
# 参数a0,a1,a2,...,an
self.m = self.x.shape[1] # 样本数m
self.n = self.x.shape[0] # 样本特征数为n
self.a = np.zeros((self.n, 1)) # 参数向量a
def h(self):
# 整体预测函数,x为二维所有样本特征向量(列向量)
return self.x.T @ self.a # 以列向量形式返回行向量点乘列向量
def predict(self, xi):
# 对某个样本进行预测
xi = np.insert(xi, 0, 1)
return self.a.T @ xi
def J(self, h):
# 代价函数:j(β1,β2)
return 1/(2*self.m) * np.sum((self.y-h)**2) # 返回一个实数
def p(self, h):
# 梯度下降算法
gradient = -(1 / self.m) * (self.x.dot(self.y-h)) # 矩阵运算的形式
self.a = self.a - self.learning_rate * gradient # 更新梯度
def train(self):
for i in range(self.EPOCH):
h = self.h()
self.p(h)
print(f"第{i+1}轮损失值为:{self.J(h)}")
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化