✨ 本团队擅长数据搜集与处理、建模仿真、程序设计、仿真代码、论文写作与指导,毕业论文、期刊论文经验交流。
✅ 专业定制毕设、代码
✅ 成品或定制,查看文章底部微信二维码
(1) 基于逐次变分模态分解与注意力残差网络的轴承故障诊断方法
旋转机械设备在工业生产中广泛应用,其运行状态的实时监测和故障及时诊断对于保障生产安全和避免经济损失至关重要。滚动轴承作为旋转机械的关键支撑部件,其振动信号中蕴含着丰富的设备健康状态信息。然而在实际工业环境中,轴承振动信号不可避免地受到强背景噪声的干扰,包括其他机械部件的振动传递、电磁干扰和测量系统本身的噪声等,导致有用的故障特征信息被噪声掩盖而难以有效提取。传统的信号处理方法如傅里叶变换和小波分析在处理强噪声干扰下的非平稳信号时存在固有局限性。本研究提出一种结合逐次变分模态分解与注意力残差网络的轴承故障诊断方法。逐次变分模态分解通过迭代方式逐个提取信号中的本征模态分量,每次提取都通过变分优化确定当前模态的最优中心频率和带宽,避免了传统变分模态分解需要预先指定模态数的困难。针对分解得到的多个模态分量,设计了基于能量分布和峭度特征的融合评价指标,自动识别并剔除主要包含噪声成分的冗余模态,保留包含故障冲击信息的有效模态进行信号重构。重构后的去噪信号转换为短时傅里叶变换时频图像,作为深度学习模型的输入。在网络结构设计上,采用残差网络作为基础架构以解决深层网络的梯度消失问题,同时嵌入卷积注意力模块实现对时频图像中关键故障特征区域的自适应聚焦。卷积注意力模块包含通道注意力和空间注意力两个子模块,分别学习不同特征通道和不同空间位置的重要性权重,增强与故障判别相关的特征响应,抑制噪声和无关成分的干扰。
(2) 基于时间卷积与Transformer融合的多通道信号端到端诊断模型
工业设备的振动监测系统通常部署多个传感器从不同方向和位置采集振动信号,形成多通道的监测数据。单一通道的诊断模型难以充分利用多通道信息之间的互补性和关联性,而简单的通道拼接又忽略了不同通道数据的时序特性差异。此外,传统的特征工程方法依赖领域专家的先验知识来设计和选择故障特征,在面对新型设备或未知故障模式时缺乏适应性。针对上述问题,本研究提出一种端到端的时间卷积网络与Transformer融合模型,能够直接从原始多通道振动信号学习故障判别特征而无需人工特征提取。在模型的前端,采用时间卷积网络结构对输入信号序列进行局部模式提取,时间卷积通过因果卷积和膨胀卷积的组合实现对长时序依赖关系的高效建模,同时保持时序因果性防止信息泄露。针对多通道信号的融合问题,设计了单向补丁序列标记方法,将多通道信号沿时间维度分割为若干补丁,每个补丁包含所有通道在该时间段内的采样值,通过线性投影转换为统一维度的标记向量。这种表示方式自然地保留了通道间的同步关系,便于后续模型学习跨通道的相关特性。在模型的后端,引入Transformer架构捕捉全局范围内的特征依赖关系。为了增强模型对位置信息和通道信息的感知能力,在标准自注意力机制的基础上设计了混合注意力模块,同时计算位置维度和通道维度的注意力权重,实现更加全面的特征交互。
(3) 面向嵌入式部署的Transformer模型剪枝与轻量化方法
深度神经网络模型在故障诊断任务中展现出优异的性能,但其庞大的参数量和计算复杂度给工业现场的实际部署带来了严峻挑战。工业监测系统往往需要在资源受限的嵌入式设备上运行,这些设备的计算能力、内存容量和功耗预算都十分有限,难以支撑复杂深度模型的实时推理。模型压缩技术是解决这一矛盾的有效途径,其中结构化剪枝通过移除网络中冗余的权重、神经元或层来减少模型规模,同时尽可能保持原有的诊断精度。本研究提出一种基于重要性评分的Transformer模型剪枝方法,针对Transformer架构的特点设计了适用于注意力模块和前馈网络模块的剪枝策略。重要性评分通过计算反向传播过程中各网络模块的梯度与激活值的乘积来评估,该乘积反映了对应模块对最终输出的贡献程度,贡献较小的模块被认为是冗余的可以进行剪枝移除。对于多头注意力层,独立评估每个注意力头的重要性,移除贡献最小的若干注意力头以减少计算量。对于前馈网络层,沿隐藏维度进行神经元级别的剪枝,保留重要性评分最高的神经元子集。剪枝后的模型需要经过微调训练以恢复因结构改变导致的精度损失。
import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from scipy.signal import stft from sklearn.preprocessing import StandardScaler class SuccessiveVMD: def __init__(self, max_modes=8, alpha=2000): self.max_modes = max_modes self.alpha = alpha self.modes = [] def extract_single_mode(self, signal, omega_init, tau=0, tol=1e-7, max_iter=300): N = len(signal) freqs = np.fft.fftfreq(N) f_hat = np.fft.fft(signal) u_hat = np.zeros(N, dtype=complex) omega = omega_init for _ in range(max_iter): u_hat_old = u_hat.copy() numerator = f_hat denominator = 1 + 2 * self.alpha * (freqs - omega)**2 u_hat = numerator / denominator if np.sum(np.abs(u_hat)**2) > 0: omega = np.abs(np.sum(freqs * np.abs(u_hat)**2) / np.sum(np.abs(u_hat)**2)) if np.sum(np.abs(u_hat - u_hat_old)**2) / (np.sum(np.abs(u_hat_old)**2) + 1e-10) < tol: break return np.real(np.fft.ifft(u_hat)), omega def decompose(self, signal): self.modes = [] residual = signal.copy() for _ in range(self.max_modes): spectrum = np.abs(np.fft.fft(residual)) omega_init = np.argmax(spectrum[:len(spectrum)//2]) / len(spectrum) mode, _ = self.extract_single_mode(residual, omega_init) self.modes.append(mode) residual = residual - mode if np.std(residual) < 0.01 * np.std(signal): break return np.array(self.modes) def compute_fusion_index(self, mode): energy = np.sum(mode**2) kurtosis = np.mean(mode**4) / (np.mean(mode**2)**2 + 1e-10) return energy * kurtosis def select_and_reconstruct(self, threshold_ratio=0.1): indices = [self.compute_fusion_index(m) for m in self.modes] max_index = max(indices) selected = [m for m, idx in zip(self.modes, indices) if idx > threshold_ratio * max_index] return np.sum(selected, axis=0) class CBAMBlock(nn.Module): def __init__(self, channels, reduction=16): super(CBAMBlock, self).__init__() self.channel_attention = nn.Sequential( nn.AdaptiveAvgPool2d(1), nn.Flatten(), nn.Linear(channels, channels // reduction), nn.ReLU(), nn.Linear(channels // reduction, channels), ) self.spatial_attention = nn.Sequential( nn.Conv2d(2, 1, kernel_size=7, padding=3), nn.Sigmoid() ) def forward(self, x): avg_out = self.channel_attention(x) max_out = self.channel_attention(F.adaptive_max_pool2d(x, 1).view(x.size(0), -1)) channel_att = torch.sigmoid(avg_out + max_out).unsqueeze(-1).unsqueeze(-1) x = x * channel_att avg_spatial = torch.mean(x, dim=1, keepdim=True) max_spatial, _ = torch.max(x, dim=1, keepdim=True) spatial_att = self.spatial_attention(torch.cat([avg_spatial, max_spatial], dim=1)) x = x * spatial_att return x class ResidualBlockWithCBAM(nn.Module): def __init__(self, in_channels, out_channels, stride=1): super(ResidualBlockWithCBAM, self).__init__() self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1) self.bn1 = nn.BatchNorm2d(out_channels) self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1) self.bn2 = nn.BatchNorm2d(out_channels) self.cbam = CBAMBlock(out_channels) self.shortcut = nn.Sequential() if stride != 1 or in_channels != out_channels: self.shortcut = nn.Sequential( nn.Conv2d(in_channels, out_channels, 1, stride), nn.BatchNorm2d(out_channels) ) def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) out = self.cbam(out) out += self.shortcut(x) return F.relu(out) class CBAMResNet(nn.Module): def __init__(self, num_classes): super(CBAMResNet, self).__init__() self.conv1 = nn.Conv2d(1, 64, 7, 2, 3) self.bn1 = nn.BatchNorm2d(64) self.pool = nn.MaxPool2d(3, 2, 1) self.layer1 = self._make_layer(64, 64, 2) self.layer2 = self._make_layer(64, 128, 2, stride=2) self.layer3 = self._make_layer(128, 256, 2, stride=2) self.avgpool = nn.AdaptiveAvgPool2d((1, 1)) self.fc = nn.Linear(256, num_classes) def _make_layer(self, in_channels, out_channels, num_blocks, stride=1): layers = [ResidualBlockWithCBAM(in_channels, out_channels, stride)] for _ in range(1, num_blocks): layers.append(ResidualBlockWithCBAM(out_channels, out_channels)) return nn.Sequential(*layers) def forward(self, x): x = F.relu(self.bn1(self.conv1(x))) x = self.pool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.avgpool(x) x = x.view(x.size(0), -1) return self.fc(x) class TemporalBlock(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, dilation): super(TemporalBlock, self).__init__() padding = (kernel_size - 1) * dilation // 2 self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, padding=padding, dilation=dilation) self.bn1 = nn.BatchNorm1d(out_channels) self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, padding=padding, dilation=dilation) self.bn2 = nn.BatchNorm1d(out_channels) self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None def forward(self, x): out = F.relu(self.bn1(self.conv1(x))) out = self.bn2(self.conv2(out)) res = x if self.downsample is None else self.downsample(x) return F.relu(out + res) class TCN(nn.Module): def __init__(self, input_channels, num_channels, kernel_size=3): super(TCN, self).__init__() layers = [] num_levels = len(num_channels) for i in range(num_levels): dilation = 2 ** i in_ch = input_channels if i == 0 else num_channels[i-1] out_ch = num_channels[i] layers.append(TemporalBlock(in_ch, out_ch, kernel_size, dilation)) self.network = nn.Sequential(*layers) def forward(self, x): return self.network(x) class MixedAttention(nn.Module): def __init__(self, d_model, num_heads): super(MixedAttention, self).__init__() self.d_model = d_model self.num_heads = num_heads self.d_k = d_model // num_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) self.channel_attention = nn.Sequential( nn.Linear(d_model, d_model // 4), nn.ReLU(), nn.Linear(d_model // 4, d_model), nn.Sigmoid() ) def forward(self, x): batch_size, seq_len, _ = x.size() Q = self.W_q(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2) K = self.W_k(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2) V = self.W_v(x).view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2) scores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k) attention = F.softmax(scores, dim=-1) context = torch.matmul(attention, V) context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model) channel_weights = self.channel_attention(context.mean(dim=1)).unsqueeze(1) context = context * channel_weights return self.W_o(context) class TCNTransformer(nn.Module): def __init__(self, input_channels, seq_length, num_classes, d_model=128, num_heads=4): super(TCNTransformer, self).__init__() self.tcn = TCN(input_channels, [32, 64, d_model]) self.patch_size = 16 self.num_patches = seq_length // self.patch_size self.patch_embedding = nn.Linear(self.patch_size * d_model, d_model) self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches, d_model)) self.attention_layers = nn.ModuleList([ MixedAttention(d_model, num_heads) for _ in range(3) ]) self.layer_norms = nn.ModuleList([nn.LayerNorm(d_model) for _ in range(3)]) self.classifier = nn.Linear(d_model, num_classes) def forward(self, x): tcn_out = self.tcn(x) batch_size, channels, seq_len = tcn_out.size() patches = tcn_out.view(batch_size, channels, self.num_patches, self.patch_size) patches = patches.permute(0, 2, 1, 3).contiguous() patches = patches.view(batch_size, self.num_patches, -1) x = self.patch_embedding(patches) + self.pos_embedding for attn, norm in zip(self.attention_layers, self.layer_norms): x = norm(x + attn(x)) x = x.mean(dim=1) return self.classifier(x) class TransformerPruner: def __init__(self, model, target_sparsity=0.5): self.model = model self.target_sparsity = target_sparsity self.importance_scores = {} def compute_importance_scores(self, data_loader, criterion): self.model.train() gradients = {name: [] for name, _ in self.model.named_parameters()} activations = {name: [] for name, _ in self.model.named_modules()} for batch in data_loader: x, y = batch self.model.zero_grad() output = self.model(x) loss = criterion(output, y) loss.backward() for name, param in self.model.named_parameters(): if param.grad is not None: gradients[name].append(param.grad.abs().mean().item()) for name in gradients: if gradients[name]: self.importance_scores[name] = np.mean(gradients[name]) def prune_attention_heads(self, layer, num_heads_to_prune): with torch.no_grad(): W_q = layer.W_q.weight.view(layer.num_heads, layer.d_k, -1) head_importance = torch.norm(W_q, dim=(1, 2)) _, indices_to_prune = torch.topk(head_importance, num_heads_to_prune, largest=False) mask = torch.ones(layer.num_heads, dtype=torch.bool) mask[indices_to_prune] = False new_num_heads = layer.num_heads - num_heads_to_prune new_d_model = new_num_heads * layer.d_k return mask, new_num_heads, new_d_model def prune_feedforward(self, linear_layer, num_neurons_to_prune): with torch.no_grad(): neuron_importance = torch.norm(linear_layer.weight, dim=1) _, indices_to_keep = torch.topk(neuron_importance, linear_layer.out_features - num_neurons_to_prune) new_weight = linear_layer.weight[indices_to_keep] new_bias = linear_layer.bias[indices_to_keep] if linear_layer.bias is not None else None new_layer = nn.Linear(linear_layer.in_features, len(indices_to_keep)) new_layer.weight.data = new_weight if new_bias is not None: new_layer.bias.data = new_bias return new_layer def iterative_pruning(self, data_loader, criterion, num_iterations=5): sparsity_per_iteration = self.target_sparsity / num_iterations for iteration in range(num_iterations): self.compute_importance_scores(data_loader, criterion) for name, module in self.model.named_modules(): if isinstance(module, MixedAttention): num_to_prune = int(module.num_heads * sparsity_per_iteration) if num_to_prune > 0: mask, new_heads, new_dim = self.prune_attention_heads(module, num_to_prune) return self.model class LightweightFaultDiagnoser: def __init__(self, input_channels=1, num_classes=4): self.svmd = SuccessiveVMD() self.model = TCNTransformer(input_channels, seq_length=1024, num_classes=num_classes) self.pruner = TransformerPruner(self.model) self.scaler = StandardScaler() def preprocess(self, signal): modes = self.svmd.decompose(signal) reconstructed = self.svmd.select_and_reconstruct() f, t, Zxx = stft(reconstructed, nperseg=64) return np.abs(Zxx) def train(self, train_signals, train_labels, epochs=50): processed = [self.preprocess(s) for s in train_signals] X = torch.FloatTensor(np.array(processed)).unsqueeze(1) y = torch.LongTensor(train_labels) dataset = torch.utils.data.TensorDataset(X, y) loader = torch.utils.data.DataLoader(dataset, batch_size=32, shuffle=True) optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001) criterion = nn.CrossEntropyLoss() for epoch in range(epochs): for batch_x, batch_y in loader: optimizer.zero_grad() output = self.model(batch_x) loss = criterion(output, batch_y) loss.backward() optimizer.step() def compress_for_deployment(self, calibration_loader): criterion = nn.CrossEntropyLoss() self.model = self.pruner.iterative_pruning(calibration_loader, criterion) return self.model def diagnose(self, signal): processed = self.preprocess(signal) x = torch.FloatTensor(processed).unsqueeze(0).unsqueeze(0) self.model.eval() with torch.no_grad(): output = self.model(x) pred = torch.argmax(output, dim=1).item() fault_types = ['normal', 'inner_race', 'outer_race', 'ball'] return fault_types[pred]具体问题,可以直接沟通
👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇👇