fork download
  1.  
Success #stdin #stdout 0.13s 14036KB
stdin
import numpy as np
import pandas as pd
import pandas_ta as ta
import ccxt
import gym
from gym import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import BaseCallback
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.feature_selection import SelectKBest, f_regression
from sklearn.ensemble import RandomForestRegressor
import matplotlib.pyplot as plt
import quantstats as qs
import time
import os
import warnings
import requests
import json
from datetime import datetime, timedelta
import random

# 禁用警告
warnings.filterwarnings('ignore')

# ====================== API 配置 ======================
API_KEY = "4230d936a253e85e349c9ae221e417ee"
SECRET_KEY = "24f38a1ed7d760a43aaa06ee49d607563f4c2882d58be726fbf13a3c773bbec0"

# 简化版链上数据API配置
ONCHAIN_API = {
    "PEPE": "https://a...content-available-to-author-only...n.io/api",
    "SOL": "https://a...content-available-to-author-only...n.io"
}

# ====================== 简化版因子工程 ======================
def calculate_features(df, symbol):
    """计算技术指标和链上因子"""
    # 基础价格特征
    df['returns'] = df['close'].pct_change()
    
    # 动量指标
    df['rsi'] = ta.rsi(df['close'], length=14)
    df['macd'] = ta.macd(df['close'], fast=12, slow=26)['MACD_12_26_9']
    df['macd_signal'] = ta.macd(df['close'], fast=12, slow=26)['MACDs_12_26_9']
    
    # 波动率指标
    df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14)
    df['bb_upper'] = ta.bbands(df['close'], length=20)['BBU_20_2.0']
    df['bb_lower'] = ta.bbands(df['close'], length=20)['BBL_20_2.0']
    
    # 成交量指标
    df['obv'] = ta.obv(df['close'], df['volume'])
    
    # 特征工程
    df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['close']
    df['macd_diff'] = df['macd'] - df['macd_signal']
    df['volume_change'] = df['volume'].pct_change()
    
    # 目标变量
    df['target'] = df['close'].shift(-1) / df['close'] - 1
    
    # 添加链上数据 (简化版)
    if "PEPE" in symbol:
        df['pepe_burn'] = np.random.poisson(10000000, len(df))
        df['pepe_large_tx'] = np.random.poisson(5, len(df))
    elif "SOL" in symbol:
        df['sol_tps'] = np.random.normal(3000, 500, len(df))
        df['sol_staked'] = np.random.uniform(70, 80, len(df))
    
    return df.dropna()

# ====================== 特征选择 ======================
def select_features(features, target, k=8):
    """选择最重要的k个特征"""
    selector = SelectKBest(score_func=f_regression, k=k)
    selector.fit(features, target)
    selected_features = features.columns[selector.get_support()]
    
    print("Selected Features:")
    print(selected_features.tolist())
    
    return features[selected_features]

# ====================== 强化学习交易环境 ======================
class CryptoMarketEnv(gym.Env):
    """加密货币交易强化学习环境"""
    metadata = {'render.modes': ['human']}
    
    def __init__(self, df, initial_balance=10000, transaction_cost=0.001, max_drawdown=0.2):
        super(CryptoMarketEnv, self).__init__()
        
        self.df = df
        self.current_step = 0
        self.initial_balance = initial_balance
        self.transaction_cost = transaction_cost
        self.max_drawdown = max_drawdown
        
        # 状态空间: 技术指标 + 持仓信息
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, 
            shape=(8,)  # 简化状态空间
        )
        
        # 动作空间: [买入, 卖出, 持有] + 仓位比例
        self.action_space = spaces.Box(low=np.array([-1, 0]), high=np.array([1, 1]), dtype=np.float32)
        
        self.reset()
    
    def reset(self):
        self.balance = self.initial_balance
        self.crypto_held = 0
        self.current_value = self.initial_balance
        self.previous_value = self.initial_balance
        self.peak_value = self.initial_balance
        self.current_step = 0
        self.trades = []
        self.portfolio_history = [self.initial_balance]
        self.drawdown = 0.0
        self.trade_count = 0
        
        return self._next_observation()
    
    def _next_observation(self):
        # 获取当前状态的技术指标
        features = self.df.iloc[self.current_step][['rsi', 'macd', 'atr', 'bb_width', 'macd_diff']].values
        state = features.astype(np.float32)
        
        # 添加持仓信息
        state = np.append(state, [
            self.balance / self.initial_balance,
            self.crypto_held * self._get_current_price() / self.initial_balance,
            self.crypto_held
        ])
        
        return state
    
    def _get_current_price(self):
        return self.df.iloc[self.current_step]['close']
    
    def step(self, action):
        # 执行交易动作
        current_price = self._get_current_price()
        action_type = action[0]  # [-1, 1] 买入/卖出信号
        position_size = action[1]  # [0, 1] 仓位比例
        
        # 计算交易量
        if action_type > 0.2:  # 买入
            trade_amount = min(
                self.balance * position_size / current_price,
                self.balance / current_price
            )
            cost = trade_amount * current_price * self.transaction_cost
            self.balance -= (trade_amount * current_price + cost)
            self.crypto_held += trade_amount
            self.trade_count += 1
        
        elif action_type < -0.2:  # 卖出
            trade_amount = min(self.crypto_held, self.crypto_held * position_size)
            cost = trade_amount * current_price * self.transaction_cost
            self.balance += (trade_amount * current_price - cost)
            self.crypto_held -= trade_amount
            self.trade_count += 1
        
        # 更新到下一步
        self.current_step += 1
        if self.current_step >= len(self.df) - 1:
            done = True
        else:
            done = False
        
        # 计算新价值
        self.previous_value = self.current_value
        current_price = self._get_current_price()
        self.current_value = self.balance + self.crypto_held * current_price
        
        # 更新峰值和回撤
        if self.current_value > self.peak_value:
            self.peak_value = self.current_value
        self.drawdown = (self.peak_value - self.current_value) / self.peak_value
        
        # 熔断机制
        if self.drawdown > self.max_drawdown:
            done = True
            print(f"熔断触发! 回撤超过{self.max_drawdown*100:.0f}%")
        
        self.portfolio_history.append(self.current_value)
        
        # 计算奖励
        reward = self.current_value - self.previous_value
        
        # 获取新状态
        next_state = self._next_observation()
        
        return next_state, reward, done, {}
    
    def render(self, mode='human'):
        profit_pct = (self.current_value - self.initial_balance) / self.initial_balance * 100
        print(f"Step: {self.current_step}/{len(self.df)} | "
              f"Value: ${self.current_value:,.2f} | "
              f"Profit: {profit_pct:.2f}% | "
              f"Drawdown: {self.drawdown*100:.2f}% | "
              f"Trades: {self.trade_count}")

# ====================== 主交易系统 ======================
class CryptoDRLTradingSystem:
    def __init__(self, symbols=['PEPE/USDT', 'SOL/USDT'], initial_balance=10000):
        self.symbols = symbols
        self.initial_balance = initial_balance
        self.exchange = self._setup_exchange()
        self.models = {}
        self.scalers = {}
        self.portfolio = {
            'values': [initial_balance],
            'returns': [],
            'current_value': initial_balance,
            'daily_return': 0.0
        }
        self.balance = initial_balance
        self.crypto_held = 0
        
    def _setup_exchange(self):
        """设置交易所连接"""
        exchange = ccxt.binance({
            'apiKey': API_KEY,
            'secret': SECRET_KEY,
            'rateLimit': 100,
            'enableRateLimit': True,
            'options': {
                'defaultType': 'spot',  # 使用现货交易
            }
        })
        
        # 检查连接
        try:
            markets = exchange.load_markets()
            print("交易所连接成功! 可用交易对数量:", len(markets))
        except Exception as e:
            print(f"交易所连接失败: {str(e)}")
            print("使用模拟模式运行...")
        
        return exchange
    
    def fetch_historical_data(self, symbol, timeframe='1d', limit=100):
        """获取历史数据"""
        print(f"获取历史数据: {symbol}...")
        
        try:
            # 尝试从交易所获取数据
            ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
            df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            df.set_index('timestamp', inplace=True)
            return df
        except Exception as e:
            print(f"获取数据失败: {str(e)}, 使用模拟数据")
            # 生成模拟数据
            dates = pd.date_range(end=pd.Timestamp.now(), periods=limit, freq='D')
            prices = np.cumprod(1 + np.random.normal(0.005, 0.03, limit))
            volumes = np.random.randint(10000, 50000, limit)
            
            df = pd.DataFrame({
                'timestamp': dates,
                'open': prices * 0.99,
                'high': prices * 1.01,
                'low': prices * 0.98,
                'close': prices,
                'volume': volumes
            })
            
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df.set_index('timestamp', inplace=True)
            return df
    
    def prepare_dataset(self):
        """准备数据集"""
        all_data = {}
        
        for symbol in self.symbols:
            # 获取历史数据
            df = self.fetch_historical_data(symbol, limit=200)
            
            # 计算特征
            df = calculate_features(df, symbol)
            
            # 特征选择
            features = df.drop(columns=['open', 'high', 'low', 'close', 'volume', 'target'])
            target = df['target']
            
            # 选择最重要的特征
            selected_features = select_features(features, target, k=5)
            
            # 归一化
            scaler = StandardScaler()
            scaled_features = scaler.fit_transform(selected_features)
            df[selected_features.columns] = scaled_features
            
            # 存储数据
            all_data[symbol] = {
                'features': selected_features,
                'target': target,
                'original': df,
                'scaler': scaler
            }
            self.scalers[symbol] = scaler
        
        return all_data
    
    def train_drl_agent(self, data, symbol, timesteps=20000):
        """训练深度强化学习代理"""
        print(f"训练DRL代理: {symbol}...")
        
        # 创建环境
        env = CryptoMarketEnv(data[symbol]['original'], self.initial_balance)
        env = DummyVecEnv([lambda: env])
        
        # 初始化PPO模型
        policy_kwargs = dict(net_arch=[64, 64])
        
        model = PPO(
            "MlpPolicy", 
            env, 
            verbose=1, 
            tensorboard_log=f"./tensorboard/{symbol}/",
            policy_kwargs=policy_kwargs,
            learning_rate=3e-4,
            gamma=0.99,
            ent_coef=0.01
        )
        
        # 训练模型
        model.learn(total_timesteps=timesteps)
        
        # 保存模型
        os.makedirs("models", exist_ok=True)
        model.save(f"models/drl_model_{symbol.replace('/', '_')}")
        self.models[symbol] = model
        
        return model
    
    def backtest(self, data, symbol):
        """回测交易策略"""
        if symbol not in self.models:
            print(f"{symbol}未找到模型,正在训练...")
            self.train_drl_agent(data, symbol, timesteps=10000)
        
        # 创建环境
        env = CryptoMarketEnv(data[symbol]['original'], self.initial_balance)
        model = self.models[symbol]
        
        # 运行回测
        obs = env.reset()
        done = False
        step_count = 0
        while not done:
            action, _ = model.predict(obs)
            obs, _, done, _ = env.step(action)
            if done or step_count % 20 == 0:
                env.render()
            step_count += 1
        
        # 分析结果
        portfolio_history = env.portfolio_history
        returns = pd.Series(portfolio_history).pct_change().dropna()
        
        # 更新投资组合记录
        self.portfolio['values'] = portfolio_history
        self.portfolio['returns'] = returns.tolist()
        self.portfolio['current_value'] = portfolio_history[-1]
        self.portfolio['daily_return'] = returns.iloc[-1] if len(returns) > 0 else 0
        
        # 性能报告
        print("\n" + "="*60)
        print(f"{symbol}回测结果")
        print("="*60)
        print(f"初始资金: ${self.initial_balance:,.2f}")
        print(f"最终资产: ${portfolio_history[-1]:,.2f}")
        print(f"总收益: {(portfolio_history[-1] - self.initial_balance) / self.initial_balance * 100:.2f}%")
        print(f"最大回撤: {self.calculate_max_drawdown(portfolio_history) * 100:.2f}%")
        print(f"交易次数: {env.trade_count}")
        
        # 绘制结果
        self.plot_results(symbol, data[symbol]['original'], portfolio_history)
        
        return returns
    
    def calculate_max_drawdown(self, values):
        """计算最大回撤"""
        peak = values[0]
        max_drawdown = 0
        for value in values:
            if value > peak:
                peak = value
            drawdown = (peak - value) / peak
            if drawdown > max_drawdown:
                max_drawdown = drawdown
        return max_drawdown
    
    def plot_results(self, symbol, price_data, portfolio_history):
        """绘制回测结果"""
        plt.figure(figsize=(14, 10))
        
        # 价格图表
        plt.subplot(2, 1, 1)
        plt.plot(price_data.index, price_data['close'], label='价格', color='blue')
        plt.title(f"{symbol} 价格走势", fontsize=14)
        plt.ylabel('价格 (USDT)', fontsize=12)
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.legend()
        
        # 资产图表
        plt.subplot(2, 1, 2)
        plt.plot(price_data.index, portfolio_history[:len(price_data)], label='资产总值', color='green')
        plt.title("资产变化", fontsize=14)
        plt.xlabel('日期', fontsize=12)
        plt.ylabel('资产 (USDT)', fontsize=12)
        plt.grid(True, linestyle='--', alpha=0.7)
        plt.legend()
        
        plt.tight_layout()
        plt.savefig(f'backtest_{symbol.replace("/", "_")}.png')
        plt.show()
    
    def live_trading(self, symbol):
        """实盘交易"""
        if symbol not in self.models:
            print(f"{symbol}未找到模型,正在训练...")
            data = self.prepare_dataset()
            self.train_drl_agent(data, symbol)
        
        model = self.models[symbol]
        scaler = self.scalers[symbol]
        
        print(f"启动{symbol}实盘交易...")
        trade_count = 0
        
        while True:
            try:
                # 获取最新数据
                now = datetime.utcnow()
                start_time = now - timedelta(days=30)
                ohlcv = self.exchange.fetch_ohlcv(symbol, '1d', since=int(start_time.timestamp() * 1000))
                df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
                df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
                df.set_index('timestamp', inplace=True)
                
                # 计算特征
                df = calculate_features(df, symbol)
                
                # 选择特征并归一化
                features = df.drop(columns=['open', 'high', 'low', 'close', 'volume', 'target'])
                selected_features = features[scaler.feature_names_in_]
                scaled_features = scaler.transform(selected_features)
                df[selected_features.columns] = scaled_features
                
                # 创建当前状态
                current_state = self._create_current_state(df.iloc[-1])
                
                # 预测动作
                action, _ = model.predict(current_state)
                
                # 执行交易
                self.execute_trade(symbol, action, df.iloc[-1]['close'])
                
                # 更新投资组合
                self.update_portfolio(symbol, action, df.iloc[-1]['close'])
                
                # 实时监控
                self.real_time_monitoring()
                
                # 每天交易一次
                next_run = now + timedelta(days=1)
                sleep_time = (next_run - datetime.utcnow()).total_seconds()
                print(f"下一次交易在: {next_run} (等待{sleep_time/3600:.1f}小时)")
                time.sleep(max(sleep_time, 60))  # 至少等待60秒
                
                trade_count += 1
                
            except KeyboardInterrupt:
                print("交易已停止")
                break
            except Exception as e:
                print(f"发生错误: {str(e)}")
                time.sleep(60)
    
    def _create_current_state(self, data_row):
        """创建当前状态向量"""
        # 获取特征值
        features = data_row[['rsi', 'macd', 'atr', 'bb_width', 'macd_diff']].values
        state = features.astype(np.float32)
        
        # 添加持仓信息
        state = np.append(state, [
            self.balance / self.initial_balance,
            self.crypto_held * data_row['close'] / self.initial_balance,
            self.crypto_held
        ])
        
        return state.reshape(1, -1)
    
    def execute_trade(self, symbol, action, price):
        """执行交易"""
        action_type = "买入" if action[0] > 0.2 else "卖出" if action[0] < -0.2 else "持有"
        size = action[1]
        
        print(f"\n执行交易: {action_type} {symbol} {size*100:.1f}% 仓位 @ ${price:.6f}")
        
        # 模拟交易逻辑
        if action_type == "买入":
            # 计算购买量
            amount = (self.balance * size) / price
            self.crypto_held += amount
            self.balance -= amount * price
            print(f"模拟买入: {amount:.2f} {symbol} @ ${price:.6f}")
                
        elif action_type == "卖出":
            # 计算出售量
            amount = self.crypto_held * size
            self.crypto_held -= amount
            self.balance += amount * price
            print(f"模拟卖出: {amount:.2f} {symbol} @ ${price:.6f}")
    
    def update_portfolio(self, symbol, action, price):
        """更新投资组合价值"""
        # 更新当前价值
        self.portfolio['current_value'] = self.balance + self.crypto_held * price
        
        # 计算日收益率
        if len(self.portfolio['values']) > 0:
            last_value = self.portfolio['values'][-1]
            daily_return = (self.portfolio['current_value'] - last_value) / last_value
            self.portfolio['daily_return'] = daily_return
            self.portfolio['returns'].append(daily_return)
        
        # 添加当前价值
        self.portfolio['values'].append(self.portfolio['current_value'])
        
        print(f"投资组合更新: ${self.portfolio['current_value']:,.2f} | "
              f"日收益率: {self.portfolio['daily_return']*100:.2f}%")
    
    def real_time_monitoring(self):
        """实时监控仪表盘"""
        print(f"\n===== 实时监控仪表盘 =====")
        print(f"当前资产: ${self.portfolio['current_value']:,.2f}")
        print(f"日收益率: {self.portfolio['daily_return']*100:.2f}%")
        
        # 计算最大回撤
        if len(self.portfolio['values']) > 10:
            max_drawdown = self.calculate_max_drawdown(self.portfolio['values'])
            print(f"最大回撤: {max_drawdown*100:.2f}%")
        
        print("=" * 30)
    
    def run_full_system(self, backtest_only=True):
        """运行完整交易系统"""
        print("="*60)
        print("加密货币深度强化学习交易系统")
        print(f"交易对: {', '.join(self.symbols)}")
        print(f"初始资金: ${self.initial_balance:,.2f}")
        print("="*60)
        
        # 准备数据
        print("\n[1/3] 准备数据...")
        data = self.prepare_dataset()
        
        # 训练模型
        print("\n[2/3] 训练强化学习模型...")
        for symbol in self.symbols:
            self.train_drl_agent(data, symbol, timesteps=15000)
        
        # 回测
        print("\n[3/3] 运行回测...")
        for symbol in self.symbols:
            self.backtest(data, symbol)
        
        if not backtest_only:
            print("\n系统准备就绪! 启动实盘交易。")
            for symbol in self.symbols:
                self.live_trading(symbol)
        else:
            print("\n回测完成! 查看图表了解策略表现。")

# 主函数
if __name__ == "__main__":
    # 创建交易目录
    os.makedirs("models", exist_ok=True)
    os.makedirs("tensorboard", exist_ok=True)
    
    print("="*60)
    print("PEPE/SOL币深度强化学习交易系统")
    print("="*60)
    print("1. 运行回测")
    print("2. 启动实盘交易")
    
    choice = input("请选择操作 (1/2): ")
    
    # 初始化交易系统
    trading_system = CryptoDRLTradingSystem(
        symbols=['PEPE/USDT', 'SOL/USDT'],
        initial_balance=10000
    )
    
    if choice == "1":
        # 运行回测
        trading_system.run_full_system(backtest_only=True)
    elif choice == "2":
        # 运行实盘交易
        print("警告: 实盘交易涉及真实资金风险!")
        confirm = input("确定要启动实盘交易吗? (y/n): ")
        if confirm.lower() == 'y':
            trading_system.run_full_system(backtest_only=False)
        else:
            print("取消实盘交易")
    else:
        print("无效选择")
stdout
Standard output is empty