蒙特卡洛模拟与风险分析
python
class MonteCarloCashFlowSimulator: """蒙特卡洛现金流模拟器""" def __init__(self, deterministic_forecaster, probabilistic_forecaster): self.det_forecaster = deterministic_forecaster self.prob_forecaster = probabilistic_forecaster def simulate(self, n_simulations: int = 1000, confidence_level: float = 0.95): """运行蒙特卡洛模拟""" # 1. 获取基础预测 base_forecast = self.det_forecaster.forecast() # 2. 模拟概率性现金流的不确定性 simulations = [] for i in range(n_simulations): # 对每个概率性事件进行随机采样 simulated_cash = self._simulate_scenario(base_forecast.copy()) simulations.append(simulated_cash) # 3. 计算统计量 simulations_df = pd.DataFrame(simulations).T results = { 'mean': simulations_df.mean(axis=1), 'median': simulations_df.median(axis=1), 'std': simulations_df.std(axis=1), 'percentile_5': simulations_df.quantile(0.05, axis=1), 'percentile_25': simulations_df.quantile(0.25, axis=1), 'percentile_75': simulations_df.quantile(0.75, axis=1), 'percentile_95': simulations_df.quantile(0.95, axis=1), 'var_95': simulations_df.quantile(1 - confidence_level, axis=1), 'cvar_95': self._calculate_cvar(simulations_df, confidence_level) } return pd.DataFrame(results) def _simulate_scenario(self, base_forecast: pd.DataFrame) -> pd.Series: """模拟单个场景""" # 对每个不确定性因素进行采样 # 1. 收款延迟(基于客户历史行为) delayed_receipts = self._sample_delayed_receipts() # 2. 新签约项目(基于销售漏斗) new_contracts = self._sample_new_contracts() # 3. 项目延期付款(基于项目风险) delayed_payments = self._sample_delayed_payments() # 合并所有影响 adjusted_forecast = base_forecast.copy() # 应用延迟(将资金移到未来日期) for delay in delayed_receipts: original_date = delay['original_date'] new_date = delay['new_date'] amount = delay['amount'] if original_date in adjusted_forecast.index: adjusted_forecast.loc[original_date, 'cash_in'] -= amount if new_date in adjusted_forecast.index: adjusted_forecast.loc[new_date, 'cash_in'] += amount # 添加新合同 for contract in new_contracts: date = contract['date'] amount = contract['amount'] if date in adjusted_forecast.index: adjusted_forecast.loc[date, 'cash_in'] += amount # 重新计算净现金流 adjusted_forecast['net_cash'] = ( adjusted_forecast['cash_in'] - adjusted_forecast['cash_out'] ) return adjusted_forecast['net_cash'] def _sample_delayed_receipts(self): """采样收款延迟""" # 从历史数据拟合延迟分布 # 假设延迟天数服从指数分布 delay_lambda = 0.1 # 平均延迟10天 delays = [] # 对每个预期收款进行采样 for _, row in self.det_forecaster.contracts.iterrows(): if np.random.random() < 0.2: # 20%的概率发生延迟 delay_days = np.random.exponential(1/delay_lambda) delays.append({ 'original_date': row['due_date'], 'new_date': row['due_date'] + timedelta(days=int(delay_days)), 'amount': row['amount'] * row['collection_probability'] }) return delays def stress_test(self, stress_scenarios: Dict): """压力测试""" results = {} for scenario_name, scenario_params in stress_scenarios.items(): print(f"Running stress test: {scenario_name}") # 应用压力参数 stressed_forecast = self._apply_stress_scenario( self.det_forecaster.forecast(), scenario_params ) # 运行模拟 simulations = [] for _ in range(500): # 减少模拟次数以提高速度 sim = self._simulate_scenario(stressed_forecast.copy()) simulations.append(sim) # 分析结果 sim_df = pd.DataFrame(simulations).T results[scenario_name] = { 'worst_case': sim_df.min(axis=1).min(), 'probability_of_negative': (sim_df < 0).mean().mean(), 'expected_shortfall': sim_df.quantile(0.05, axis=1).mean() } return pd.DataFrame(results).T4.5 模型评估与监控
python
class ModelPerformanceMonitor: """模型性能监控器""" def __init__(self): self.history = pd.DataFrame( columns=['date', 'model', 'mae', 'rmse', 'mape', 'coverage_95'] ) def track_performance(self, date: datetime, model_name: str, y_true: np.ndarray, y_pred: np.ndarray, lower_bound: np.ndarray = None, upper_bound: np.ndarray = None): """跟踪模型性能""" metrics = { 'date': date, 'model': model_name, 'mae': mean_absolute_error(y_true, y_pred), 'rmse': np.sqrt(mean_squared_error(y_true, y_pred)), 'mape': np.mean(np.abs((y_true - y_pred) / np.where(y_true != 0, y_true, 1))) * 100 } # 计算预测区间覆盖率 if lower_bound is not None and upper_bound is not None: in_interval = (y_true >= lower_bound) & (y_true <= upper_bound) metrics['coverage_95'] = in_interval.mean() * 100 self.history = self.history.append(metrics, ignore_index=True) return metrics def detect_drift(self, window_size: int = 30): """检测模型性能漂移""" if len(self.history) < window_size * 2: return None recent_perf = self.history.tail(window_size) historical_perf = self.history.iloc[-(window_size*2):-window_size] drift_detected = {} for metric in ['mae', 'rmse', 'mape']: # 计算统计检验(简化版本:均值变化超过2个标准差) recent_mean = recent_perf[metric].mean() historical_mean = historical_perf[metric].mean() historical_std = historical_perf[metric].std() z_score = abs(recent_mean - historical_mean) / historical_std if historical_std > 0 else 0 if z_score > 2: # 超过2个标准差 drift_detected[metric] = { 'z_score': z_score, 'change_pct': (recent_mean - historical_mean) / historical_mean * 100 } return drift_detected def generate_performance_report(self): """生成性能报告""" if len(self.history) == 0: return "No performance data available." report = [] report.append("=" * 60) report.append("CASH FLOW PREDICTION MODEL PERFORMANCE REPORT") report.append("=" * 60) for model in self.history['model'].unique(): model_data = self.history[self.history['model'] == model] report.append(f"\nModel: {model}") report.append(f" Period: {model_data['date'].min().date()} to {model_data['date'].max().date()}") report.append(f" Total predictions: {len(model_data)}") report.append(f" Average MAE: {model_data['mae'].mean():.2f}") report.append(f" Average RMSE: {model_data['rmse'].mean():.2f}") report.append(f" Average MAPE: {model_data['mape'].mean():.1f}%") if 'coverage_95' in model_data.columns: report.append(f" 95% CI Coverage: {model_data['coverage_95'].mean():.1f}%") # 检测漂移 drift = self.detect_drift() if drift: report.append("\n⚠️ MODEL DRIFT DETECTED ⚠️") for metric, info in drift.items(): report.append(f" {metric.upper()}: z-score={info['z_score']:.2f}, change={info['change_pct']:.1f}%") report.append("\nRecommendation: Retrain model with recent data.") return "\n".join(report)4.6 完整预测流水线示例
python
class CompleteCashFlowPipeline: """完整的现金流预测流水线""" def __init__(self): self.det_forecaster = None self.prob_forecaster = None self.ensemble_forecaster = EnsembleCashFlowForecaster() self.monte_carlo = None self.monitor = ModelPerformanceMonitor() def run_daily_prediction(self, current_date: datetime): """每日预测流程""" print(f"Running cash flow prediction for {current_date.date()}") print("-" * 50) # 1. 数据准备 contracts_df = self._load_contracts() payments_df = self._load_payments() historical_df = self._load_historical_data() # 2. 确定性预测 self.det_forecaster = DeterministicCashFlowForecaster(contracts_df, payments_df) det_forecast = self.det_forecaster.forecast( start_date=current_date, end_date=current_date + timedelta(days=90) ) # 3. 概率性预测(每周运行) if current_date.weekday() == 0: # 每周一运行 print("Running probabilistic forecast...") # 训练集成模型 self.ensemble_forecaster.train_ensemble(historical_df) # 生成未来90天预测 future_dates = pd.date_range( start=current_date, end=current_date + timedelta(days=90), freq='D' ) prob_forecast = self.ensemble_forecaster.forecast(future_dates, historical_df) # 4. 合并预测 combined_forecast = self._combine_forecasts(det_forecast, prob_forecast) # 5. 蒙特卡洛模拟(每月运行) if current_date.day == 1: # 每月第一天 print("Running Monte Carlo simulation...") self.monte_carlo = MonteCarloCashFlowSimulator( self.det_forecaster, self.ensemble_forecaster ) simulations = self.monte_carlo.simulate(n_simulations=1000) # 压力测试 stress_scenarios = { 'recession': {'collection_rate': 0.7, 'new_sales': -0.3}, 'client_loss': {'key_client_loss': True, 'amount': 2000000}, 'rapid_growth': {'new_sales': 0.5, 'hiring_rate': 0.2} } stress_results = self.monte_carlo.stress_test(stress_scenarios) # 保存结果 self._save_results(current_date, { 'deterministic': det_forecast, 'probabilistic': prob_forecast, 'combined': combined_forecast, 'simulations': simulations, 'stress_tests': stress_results }) return combined_forecast return det_forecast def _combine_forecasts(self, det_forecast: pd.DataFrame, prob_forecast: pd.DataFrame) -> pd.DataFrame: """合并确定性和概率性预测""" # 近期使用确定性预测(30天内),远期使用概率性预测 short_term_cutoff = det_forecast.index[0] + timedelta(days=30) combined = det_forecast.copy() # 对30天后的预测进行加权平均 for date in det_forecast.index: if date > short_term_cutoff: # 找到对应的概率性预测 if date in prob_forecast.index: # 权重随时间变化:越远确定性越低 days_ahead = (date - short_term_cutoff).days weight_det = max(0.5, 1 - days_ahead / 60) # 从0.5线性衰减 weight_prob = 1 - weight_det # 合并 combined.loc[date, 'cash_in'] = ( weight_det * det_forecast.loc[date, 'cash_in'] + weight_prob * prob_forecast.loc[date, 'predicted_cash_flow'] ) # 重新计算净现金流 combined['net_cash'] = combined['cash_in'] - combined['cash_out'] combined['cumulative_net_cash'] = combined['net_cash'].cumsum() return combined def evaluate_model(self, evaluation_date: datetime): """模型后评估""" # 获取实际发生的数据 actual_data = self._load_actual_data(evaluation_date - timedelta(days=30), evaluation_date) # 获取30天前的预测 prediction_date = evaluation_date - timedelta(days=30) historical_pred = self._load_prediction(prediction_date) if historical_pred is not None: # 对齐日期 common_dates = actual_data.index.intersection(historical_pred.index) if len(common_dates) > 0: # 计算性能指标 metrics = self.monitor.track_performance( date=evaluation_date, model_name='ensemble', y_true=actual_data.loc[common_dates, 'cash_flow'].values, y_pred=historical_pred.loc[common_dates, 'predicted_cash_flow'].values, lower_bound=historical_pred.loc[common_dates, 'lower_bound'].values, upper_bound=historical_pred.loc[common_dates, 'upper_bound'].values ) # 生成报告 report = self.monitor.generate_performance_report() print(report) # 检查是否需要重新训练 drift = self.monitor.detect_drift() if drift: print("\nModel retraining triggered due to performance drift.") # 触发重新训练流程 self._retrain_models() return metrics return None总结
这套完整的资金管理平台方案涵盖了:
供应商选择的标准化流程(RFP模板)
清晰的项目实施路线图(甘特图)
可操作的合规管理工具(检查清单和指南)
先进的预测模型实现(数学模型和完整代码)
每个部分都针对软件外包和人才服务上市公司的特殊需求进行了定制,特别是:
项目制资金核算的复杂性
人力成本为主的现金流特点
上市公司严格的合规要求
多客户、多项目的资金管理需求
建议分阶段实施,先从确定性现金流预测和基础资金监控开始,逐步引入机器学习模型和高级分析功能。同时,要建立持续改进机制,定期评估模型性能并优化预测精度。