添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Kaggle Trading at the Close PB金牌方案!

竞赛背景

证券交易所是一个快节奏、高风险的环境,每一秒都至关重要。随着交易日接近尾声,交易的强度会不断增加,最后十分钟会至关重要。这些时刻通常以高波动和快速价格波动为特征,对塑造当天的全球经济起着关键作用。

纳斯达克证券交易所的每个交易日都以纳斯达克收盘交叉盘竞价结束。这个过程为交易所上市的证券建立了官方的收盘价格。这些证券的收盘价格对投资者、分析师和其他市场参与者来说是评估个别证券和整个市场表现的关键指标。

在这个复杂的金融领域中,运营着全球领先电子market maker Optiver。受到技术创新的推动,Optiver交易各种金融工具,如衍生品、现金股票、ETF、债券和外币,在全球主要交易所上为数千种这些工具提供有竞争力的双边价格。

在纳斯达克交易所交易的最后十分钟,像Optiver这样的market maker将传统订单簿数据与竞价簿数据合并。整合来自两个来源的信息的能力对于为所有市场参与者提供最佳价格至关重要。

在这个比赛中,所有选手需要利用股票的订单簿和收盘竞价数据开发模型,预测数百家纳斯达克上市股票的收盘价格走势。来自竞价的信息可以用来调整价格,评估供需动态,并识别交易机会。

方案解读

1.因子解读

作者的因子构建的比较详细,具体的细节大家可以自己总结,此处大致汇总构建的逻辑框架。构建的粒度:

  • 最细粒度,也就是基于单个样本的cols之间的组合;
  • 基于二阶的组合,
  • date_id  + seconds_in_bucket
  • stock_id + date_id
  • stock_id + seconds_in_bucket
  • 基于三组的粒度:
  • stock_id + date_id + group
  • 其中构建的逻辑中使用到的算子,或者统计的操作:

  • min,max等基础统计信息;
  • shift;
  • pct_change;
  • diff;
  • cumsum;
  • rolling.agg;
  • weights = [
        0.0040.0010.0020.0060.0040.0040.0020.0060.0060.0020.0020.008,
        0.0060.0020.0080.0060.0020.0060.0040.0020.0040.0010.0060.004,
        0.0020.0020.0040.0020.0040.0040.0010.0010.0020.0020.0060.004,
        0.0040.0040.0060.0020.0020.04 , 0.0020.0020.0040.04 , 0.0020.001,
        0.0060.0040.0040.0060.0010.0040.0040.0020.0060.0040.0060.004,
        0.0060.0040.0020.0010.0020.0040.0020.0080.0040.0040.0020.004,
        0.0060.0020.0040.0040.0020.0040.0040.0040.0010.0020.0020.008,
        0.02 , 0.0040.0060.0020.02 , 0.0020.0020.0060.0040.0020.0010.02,
        0.0060.0010.0020.0040.0010.0020.0060.0060.0040.0060.0010.002,
        0.0040.0060.0060.0010.04 , 0.0060.0020.0040.0020.0020.0060.002,
        0.0020.0040.0060.0060.0020.0020.0080.0060.0040.0020.0060.002,
        0.0040.0060.0020.0040.0010.0040.0020.0040.0080.0060.0080.002,
        0.0040.0020.0010.0040.0040.0040.0060.0080.0040.0010.0010.002,
        0.0060.0040.0010.0020.0060.0040.0060.0080.0020.0020.0040.002,
        0.04 , 0.0020.0020.0040.0020.0020.0060.02 , 0.0040.0020.0060.02,
        0.0010.0020.0060.0040.0060.0040.0040.0040.0040.0020.0040.04,
        0.0020.0080.0020.0040.0010.0040.0060.004,
    ]

    weights = {int(k):v for k,v in enumerate(weights)}
    from numba import njit, prange

    # 📊 Function to compute triplet imbalance in parallel using Numba
    @njit(parallel=True)
    def compute_triplet_imbalance(df_values, comb_indices):
        num_rows = df_values.shape[0]
        num_combinations = len(comb_indices)
        imbalance_features = np.empty((num_rows, num_combinations))

        # 🔁 Loop through all combinations of triplets
        for i in prange(num_combinations):
            a, b, c = comb_indices[i]
            
            # 🔁 Loop through rows of the DataFrame
            for j in range(num_rows):
                max_val = max(df_values[j, a], df_values[j, b], df_values[j, c])
                min_val = min(df_values[j, a], df_values[j, b], df_values[j, c])
                mid_val = df_values[j, a] + df_values[j, b] + df_values[j, c] - min_val - max_val
                
                # 🚫 Prevent division by zero
                if mid_val == min_val:
                    imbalance_features[j, i] = np.nan
                else:
                    imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val)

        return imbalance_features

    # 📊 Function to compute triplet imbalance in parallel using Numba
    @njit(parallel=True)
    def compute_triplet_imbalance(df_values, comb_indices):
        num_rows = df_values.shape[0]
        num_combinations = len(comb_indices)
        imbalance_features = np.empty((num_rows, num_combinations))

        # 🔁 Loop through all combinations of triplets
        for i in prange(num_combinations):
            a, b, c = comb_indices[i]
            
            # 🔁 Loop through rows of the DataFrame
            for j in range(num_rows):

                if df_values[j, a] < df_values[j, b]:
                    min_val = df_values[j, a]
                    max_val = df_values[j, b]
                else:
                    max_val = df_values[j, a]
                    min_val = df_values[j, b]

                if min_val < df_values[j, c]:
                    if df_values[j, c] < max_val:
                        mid_val = df_values[j, c]
                    else:
                        mid_val = max_val
                        max_val = df_values[j, c]
                else:
                    mid_val = min_val
                    min_val = df_values[j, c]
                
                # 🚫 Prevent division by zero
                if max_val == min_val:
                    imbalance_features[j, i] = np.nan
                elif mid_val == min_val:
                    imbalance_features[j, i] = np.nan
                else:
                    imbalance_features[j, i] = (max_val - mid_val) / (mid_val - min_val)
        
        return imbalance_features

    # 📈 Function to calculate triplet imbalance for given price data and a DataFrame
    def calculate_triplet_imbalance_numba(price, df):
        # Convert DataFrame to numpy array for Numba compatibility
        df_values = df[price].values
        comb_indices = [(price.index(a), price.index(b), price.index(c)) for a, b, c in combinations(price, 3)]

        # Calculate the triplet imbalance using the Numba-optimized function
        features_array = compute_triplet_imbalance(df_values, comb_indices)

        # Create a DataFrame from the results
        columns = [f"{a}_{b}_{c}_imb2" for a, b, c in combinations(price, 3)]
        features = pd.DataFrame(features_array, columns=columns)

        return features
    def imbalance_features(df):

        stock_groups = df.groupby(["date_id""seconds_in_bucket"])
        # Index WAP
        df["wwap"]   = df.stock_id.map(weights) * df.wap
        df["iwap"]   = stock_groups["wwap"].transform(lambda x: x.sum())
        del df["wwap"]

        # Define lists of price and size-related column names
        prices = ["reference_price""far_price""near_price""ask_price""bid_price""wap"]
        sizes  = ["matched_size""bid_size""ask_size""imbalance_size"]

        # V1 features
        # Calculate various features using Pandas eval function
        df["volume"]              = df.eval("ask_size + bid_size")
        df["mid_price"]           = df.eval("(ask_price + bid_price) / 2")
        df["liquidity_imbalance"] = df.eval("(bid_size-ask_size)/(bid_size+ask_size)")
        df["matched_imbalance"]   = df.eval("(imbalance_size-matched_size)/(matched_size+imbalance_size)")
        df["all_size"]            = df.eval("matched_size + imbalance_size"# add
        df["imbalance_size_for_buy_sell"] = df.eval("imbalance_size * imbalance_buy_sell_flag")  # add
         
        cols = ['wap''imbalance_size_for_buy_sell'"bid_size""ask_size"]
        for q in [0.250.50.75]:  # Try more/different q
            df[[f'{col}_quantile_{q}' for col in cols]] = stock_groups[cols].transform(lambda x: x.quantile(q)).astype(np.float32)
        
        # Create features for pairwise price imbalances
        for c in combinations(prices, 2):
            df[f"{c[0]}_{c[1]}_imb"] = df.eval(f"({c[0]} - {c[1]})/({c[0]} + {c[1]})").astype(np.float32)

        for c in combinations(sizes, 2):
            df[f"{c[0]}/{c[1]}"] = df.eval(f"({c[0]})/({c[1]})").astype(np.float32)

        # Calculate triplet imbalance features using the Numba-optimized function
        for c in [['ask_price''bid_price''wap''reference_price'], sizes]:
            triplet_feature             = calculate_triplet_imbalance_numba(c, df)
            df[triplet_feature.columns] = triplet_feature.values.astype(np.float32)
            
        # V2 features
        # Calculate additional features
        stock_groups             = df.groupby(['stock_id''date_id'])
        df["imbalance_momentum"] = stock_groups['imbalance_size'].diff(periods=1) / df['matched_size']
        df["price_spread"]       = df["ask_price"] - df["bid_price"]
        df["spread_intensity"]   = stock_groups['price_spread'].diff()
        df['price_pressure']     = df['imbalance_size'] * (df['ask_price'] - df['bid_price'])
        df['market_urgency']     = df['price_spread'] * df['liquidity_imbalance']
        df['depth_pressure']     = (df['ask_size'] - df['bid_size']) * (df['far_price'] - df['near_price'])
        df['wap_advantage']      = df.wap - df.iwap  # add

        # Calculate various statistical aggregation features
        df_prices = df[prices]
        df_sizes  = df[sizes]
        for func in ["mean""std""skew""kurt"]:
            df[f"all_prices_{func}"] = df_prices.agg(func, axis=1)
            df[f"all_sizes_{func}"]  = df_sizes.agg(func, axis=1)
            
        # V3 features
        # Calculate shifted and return features for specific columns
        cols = ['matched_size''imbalance_size''reference_price''imbalance_buy_sell_flag'"wap""iwap"]
        stock_groups_cols = stock_groups[cols]
        for window in [123610]:
            df[[f"{col}_shift_{window}" for col in cols]] = stock_groups_cols.shift(window)

        cols = ['matched_size''imbalance_size''reference_price'"iwap"#wap
        stock_groups_cols = stock_groups[cols]
        for window in [123610]:
            df[[f"{col}_ret_{window}" for col in cols]] = stock_groups_cols.pct_change(window).astype(np.float32)

        # Calculate diff features for specific columns
        cols = ['ask_price''bid_price''ask_size''bid_size''wap''near_price''far_price''imbalance_size_for_buy_sell']
        stock_groups_cols = stock_groups[cols]
        for window in [123610]:
            df[[f"{col}_diff_{window}" for col in cols]] = stock_groups_cols.diff(window).astype(np.float32)

        # V4 features
        # Construct `time_since_last_imbalance_change`
        # 当`imbalance_buy_sell_flag`改变时,'flag_change'列会为该行赋值为1
    #     df['flag_change'] = stock_groups['imbalance_buy_sell_flag'].diff().ne(0).astype(int)
    #     # 使用cumsum创建一个组标识符,每当flag改变时,组标识符增加
    #     df['group'] = stock_groups['flag_change'].cumsum()
    #     # 对每个组内的'seconds_in_bucket'计算时间差,以得到自上次flag改变以来的时间
    #     df['time_since_last_imbalance_change'] = df.groupby(['stock_id', 'date_id', 'group'])['seconds_in_bucket'].transform(lambda x: x - x.min())
    #     # `flag_change`为1的地方设为0
    #     df.loc[df['flag_change'] == 1, 'time_since_last_imbalance_change'] = 0
    #     df.drop(columns=['flag_change', 'group'], inplace=True)
        
        df['flag_change'] = stock_groups['imbalance_buy_sell_flag'].diff().ne(0).astype(int)
        # 使用cumsum创建一个组标识符,每当flag改变时,组标识符增加
        df['group'] = df.groupby(['stock_id''date_id'])['flag_change'].cumsum()
        # 对每个组内的'seconds_in_bucket'计算时间差,以得到自上次flag改变以来的时间
        group_min = df.groupby(['stock_id''date_id''group'])['seconds_in_bucket'].transform('min')
        df['time_since_last_imbalance_change'] = df['seconds_in_bucket'] - group_min
        # `flag_change`为1的地方设为0
        df['time_since_last_imbalance_change'] *= (1 - df['flag_change'])
        df.drop(columns=['flag_change''group'], inplace=True)
        
        cols = ['imbalance_size_for_buy_sell']
        stock_groups_cols = stock_groups[cols]
        for window in [510]:
            mean_col = stock_groups_cols.transform(lambda x: x.rolling(window=window).mean())
            std_col = stock_groups_cols.transform(lambda x: x.rolling(window=window).std())
            df[[f'z_score_{col}_{window}' for col in cols]] = (df[cols] - mean_col) / std_col
        
        # Replace infinite values with 0
        return df.replace([np.inf, -np.inf], 0)

    # 📅 Function to generate time and stock-related features
    def other_features(df):
        df["seconds"] = df["seconds_in_bucket"] % 60  # Seconds
        df["minute"]  = df["seconds_in_bucket"] // 60  # Minutes

        # Map global features to the DataFrame
        for key, value in global_stock_id_feats.items():
            df[f"global_{key}"] = df["stock_id"].map(value.to_dict())
            
        for key, value in global_seconds_feats.items():
            df[f"global_seconds_{key}"] = df["seconds_in_bucket"].map(value.to_dict())

        return df

    def last_days_features(df: pd.DataFrame, feat_last=None, target_last=None):
        size = None
        
        if feat_last is not None and len(feat_last) > 0:
            cols = [col for col in df.columns if col in set(feat_last.columns)]
            if target_last is not None:
                cols.append("target")
                feat_last["target"] = target_last
                df["target"]        = 0
            paddings     = []
            second_start = df.seconds_in_bucket.max()
            padding_src  = df[df.seconds_in_bucket == second_start]
            size         = len(df)
            size_pad     = len(padding_src) * 6
            for second in range(second_start + 10, second_start + 7010):
                padding = padding_src.copy()
                padding["seconds_in_bucket"] = second
                paddings.append(padding)
            df = pd.concat([feat_last[cols], df] + paddings)

        # Add Last days features
        TODO: Try more features
        cols = ['near_price''far_price''depth_pressure']
        if 'target' in df.columns:
            cols.append('target')
        stock_groups      = df.groupby(['stock_id''seconds_in_bucket'])
        stock_groups_cols = stock_groups[cols]
        for window in [1]:
            df[[f"{col}_last_{window}day" for col in cols]] = stock_groups_cols.shift(window)
        if cols[-1] == "target":
            cols.pop()
        
        cols = [f"{col}_last_{window}day" for col in cols]
        stock_groups      = df.groupby(['stock_id''date_id'])
        stock_groups_cols = stock_groups[cols]
        for window in [1236]:
            df[[f"{col}_future_{window}" for col in cols]] = stock_groups_cols.shift(-window)
            
        if size:
            return df[-(size + size_pad):-size_pad]
        return df

    # 🚀 Function to generate all features by combining imbalance and other features
    def generate_all_features(df, feat_last=None, target_last=None):
        # Select relevant columns for feature generation
        cols = [c for c in df.columns if c not in {"row_id""time_id""currently_scored"}]
        df   = df[cols]
        
        # Generate imbalance features
        df = imbalance_features(df)
        
        # Generate last days features
        df = last_days_features(df, feat_last, target_last)
        
        # Generate time and stock-related features
        df = other_features(df)

        gc.collect()  # Perform garbage collection to free up memory
        
        # Select and return the generated features
        feature_name = [i for i in df.columns if i not in {"row_id""target""time_id"}]
        
        return df[feature_name]

    2. 模型训练

  • 基础模型:使用了Catboost+Lightgbm;
  • 在线重新训练的策略,这样可以充分使用最新的训练数据以获取更好的训练结果;这边最好Fix住最大使用内存,如果超过则进行截断,将早期的数据丢弃,不然可能会超内存的问题。
  • https://www.kaggle.com/code/wuhongrui/public-5-3227-lgbm-catboost-with-online-learning/notebook
  • https://www.kaggle.com/competitions/optiver-trading-at-the-close/discussion/462650
  • Midjourney V6提示词的结构:形式 + 主题 + 场景 + 构图 + 光照 + 附加信息
  • Sam Altman:我希望有人告诉过我这些
  • ​1222.AI日报:2023年是世界开始认真对待AI的一年
  • “这是疯狂的一年!” Sam Altman 交出 2023 年终总结,公开他曾渴望听见的 17 条建议
  • 一切为了开发者|AI、操作系统、云原生、汽车、教育、物联网、安全等前沿技术探索
  • AAAI 2024 | 测试时领域适应的鲁棒性得以保证,TRIBE在多真实场景下达到SOTA
  • arXiv大升级,论文网页版本直接看
  • 大模型竟然能玩手机了,还能用软件修图:「AppAgent」会成为2024年的新趋势吗?
  • 击败扩散模型,清华朱军团队基于薛定谔桥的新语音合成系统来了
  • Midjourney V6迎来大升级:网友惊呼生成效果太逼真
  • 14秒就能重建视频,还能变换角色,Meta让视频合成提速44倍
  • 航海报名超12000人!今日拆解视频号带货、公众号爆文
  • 红海类目卷不过?小红书这两类低价饰品也能月入 10 万
  • 使用 Taro 开发鸿蒙原生应用——探秘适配鸿蒙 ArkTS 的工作原理
  • 不同编程语言的键盘
  • 理解Mysql索引原理及特性
  • 如何在同步的Rust方法中调用异步代码 | Tokio使用中的几点教训
  • 统信UOS桌面版装机量超600万套,市占率第一
  • 预约报名|Apollo开放平台9.0自动驾驶技术公开课来啦
  •