【競馬AI-10】LightGBMで学習!レースデータから予測モデルを作る手順

競馬AI

ここまででデータの整形が一通り完了しました。不要なカラムの削除、数値化、過去5走や調教データの結合といった前処理を経て、ようやく機械学習に使える学習データが揃いました。今回からはいよいよLightGBMを使って予測モデルを作成します。データをどう分けて、どんな特徴量を使い、どのように学習・評価するのかをコードを追いながら解説していきます。

概要

今回のコードは、学習データを読み込み → 学習とテストに分割 → LightGBMでモデルを学習 → 評価 → 保存という流れになっています。

ポイントをまとめると次のとおりです。

  1. データの読み込み
    MySQLから学習用テーブルを取り込み、PandasのDataFrameに変換します。
  2. データの分割
    RACE_IDで並べて、古いレースを学習用、新しいレースをテスト用に分けます。
  3. ラベルの作成
    「3着以内」を1、それ以外を0として二値分類のラベルを作っています。
  4. 特徴量の選定
    学習に不要な列(ID、オッズ、ラベルなど)を除外し、残りを数値に変換します。数値化できない列は正規表現で数字を抜き出して変換しています。
  5. モデルの学習と評価
    LightGBMで学習し、AUC(精度指標)を計算。学習データとテストデータそれぞれでスコアを確認します。
  6. モデルの保存
    学習済みモデルをファイルに保存し、今後の予測に使えるようにします。

プログラム全文

以下が実際のコードです。
ml/estimation.py に配置する)

# ml/estimation.py 
from db.connection import get_connection
import pandas as pd
import numpy as np
import os
import lightgbm as lgb
from sklearn import metrics

# =========================
# 設定
# =========================
TABLE_NAME = "w_training_data_lgb"
MODEL_DIR = "model"
MODEL_FILE = os.path.join(MODEL_DIR, "model.txt")
TOP_N = 3            # ラベル: KAKUTEI_CHAKUJUN <= TOP_N を Positive
RANDOM_STATE = 100   # 分割・学習の乱数固定

# 学習“から”除外する列(ID/ラベル/リーク源など)
EXCLUDE_COL_EXACT = set([
    "RACE_ID",           # IDは特徴量から除外
    "KAKUTEI_CHAKUJUN",  # 目的変数の元(リーク防止)
    "TANSHO_ODDS_REAL",
    "TANSHO_NINKIJUN",
    "BATAIJU",
    "ZOGEN_SA",
    "FINISH_NUM",        # [LABEL ONLY] 追加: 数値化した着順(リーク防止)
    "LABEL_TOPN",        # [LABEL ONLY] 追加: ラベル列(リーク防止)
])

# 部分一致で除外(パターン)※大文字化して判定
EXCLUDE_COL_PATTERNS = (
    "ODDS",                 # オッズは使わない
    "KETTO_TOROKU_BANGO",   # 馬ID系
    "_RACE_ID",             # 過去レースIDの派生
    "RACE_CODE",            # 文字ID・識別子系
)

# =========================
# ユーティリティ
# =========================
def load_table_as_df(table: str) -> pd.DataFrame:
    sql = f"SELECT * FROM `{table}`"
    with get_connection() as conn:
        with conn.cursor() as cur:     # 通常カーソル(タプルが返る想定)
            cur.execute(sql)
            rows = cur.fetchall()
            cols = [d[0] for d in cur.description]
    df = pd.DataFrame(rows, columns=cols)
    print(df.head(3), df.shape)
    return df

def split_by_race_id(df: pd.DataFrame, test_size: float = 0.2) -> tuple[pd.DataFrame, pd.DataFrame]:
    sorted_id_list = df.sort_values('RACE_ID').index.unique()
    train_id_list = sorted_id_list[:round(len(sorted_id_list) * (1-test_size))]
    test_id_list = sorted_id_list[round(len(sorted_id_list) * (1-test_size)):]
    train = df.loc[train_id_list]
    test = df.loc[test_id_list]
    return train, test

def _coerce_finish_raw(s: pd.Series) -> pd.Series:
    """
    着順の粗数値化:
    - まず to_numeric
    - 0以下や90以上(99など)は NaN とみなす
    """
    x = pd.to_numeric(s, errors="coerce")
    x = x.where((x > 0) & (x < 90))
    return x

# [LABEL ONLY] 追加: 着順の堅牢パース(全角や「1位」など文字混在に対応)
_ZEN2HALF = str.maketrans('0123456789', '0123456789')
def _to_finish_num_strict(s: pd.Series) -> pd.Series:
    t = s.astype(str).str.translate(_ZEN2HALF)       # 全角→半角
    t = t.str.extract(r'(\d+)')[0]                   # 先頭の連続数字を抽出
    x = pd.to_numeric(t, errors="coerce")
    return x.where((x > 0) & (x < 90))               # 0,90以上は欠損扱い

def choose_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    学習特徴量の抽出:
      - EXCLUDE(完全一致/部分一致)で列を除外(部分一致は大文字化で判定)
      - 残り列を numeric 化: 1) to_numeric、 2) 数字抽出→numeric
      - 非NaNが1つ以上ある列のみ採用
      - 欠損はそのまま、float32 化
    """
    drop = set(EXCLUDE_COL_EXACT)

    pats_upper = [p.upper() for p in EXCLUDE_COL_PATTERNS]
    for c in df.columns:
        uc = str(c).upper()
        if any(pat in uc for pat in pats_upper):
            drop.add(c)

    candidates = [c for c in df.columns if c not in drop]
    X = pd.DataFrame(index=df.index)

    for c in candidates:
        s0 = df[c]

        # 1) to_numeric
        s_num = pd.to_numeric(s0, errors="coerce")

        # 2) ダメなら数字抽出
        if s_num.notna().sum() == 0:
            s_digits = s0.astype(str).str.extract(r"(-?\d+\.?\d*)")[0]
            s_num2 = pd.to_numeric(s_digits, errors="coerce")
            if s_num2.notna().sum() > 0:
                s_num = s_num2

        # 採用判定
        if s_num.notna().sum() > 0:
            X[c] = s_num.astype(np.float32)

    return X

def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

def safe_auc(y_true: pd.Series, y_prob: np.ndarray) -> float | None:
    """クラスが片側だけの時にエラーしないAUC"""
    try:
        return metrics.roc_auc_score(y_true, y_prob)
    except ValueError:
        return None

def summarize_exclusions(df: pd.DataFrame):
    """除外された列のサマリーを返す(デバッグ用)"""
    drop = set(EXCLUDE_COL_EXACT)
    pats_upper = [p.upper() for p in EXCLUDE_COL_PATTERNS]
    pat_hit = []

    for c in df.columns:
        uc = str(c).upper()
        hit = [pat for pat in pats_upper if pat in uc]
        if hit:
            drop.add(c)
            pat_hit.append((c, hit))

    exact_only = sorted(list(EXCLUDE_COL_EXACT.intersection(df.columns)))
    pat_only   = sorted([c for c, _ in pat_hit])
    return exact_only, pat_only

# =========================
# Main
# =========================
def main():
    print("📥 Loading table:", TABLE_NAME)
    df = load_table_as_df(TABLE_NAME)
    print(f"✅ Loaded: {len(df):,} rows, {len(df.columns)} cols")

    # [LABEL ONLY] 差し替え —— ここだけロジック変更(上書きしないで別カラムを作成)
    df['KAKUTEI_CHAKUJUN'] = pd.to_numeric(df['KAKUTEI_CHAKUJUN'], errors='coerce').between(1, 3).astype(int)
    # 学習/評価分割(RACE_IDベース + フォールバック)※未変更
    train_df, test_df = split_by_race_id(df)
    print(f"🧪 Split: train_rows={len(train_df):,}, test_rows={len(test_df):,}")

    y_train = train_df['KAKUTEI_CHAKUJUN']
    y_test = test_df['KAKUTEI_CHAKUJUN']

    print(f"🎯 Pos ratio: train={y_train.mean():.4f}  (pos={y_train.sum():,}/{len(y_train):,}), "
          f"test={y_test.mean():.4f}  (pos={y_test.sum():,}/{len(y_test):,})")
    if y_train.nunique() < 2 or y_test.nunique() < 2:
        print("⚠️ 片側クラスのみの分割になっています。分割方法やTOP_Nの再検討を推奨します。")

    # 除外列サマリー(診断)※未変更
    ex_exact, ex_pat = summarize_exclusions(train_df)
    if ex_exact or ex_pat:
        print("🧾 Excluded columns (exact):", ex_exact)
        print("🧾 Excluded columns (pattern):", ex_pat[:10], ("...(+more)" if len(ex_pat) > 10 else ""))

    # 特徴量(数値のみ抽出。ID/オッズ/ラベルなどは除外)※未変更
    X_train = choose_features(train_df)
    X_test  = choose_features(test_df)

    # 列整合 ※未変更
    common_cols = X_train.columns.intersection(X_test.columns)
    X_train = X_train[common_cols]
    X_test  = X_test[common_cols]
    print(f"🧩 Features: {len(common_cols)} cols")

    if X_train.shape[1] == 0:
        try:
            print(train_df["LABEL_TOPN"].value_counts(dropna=False).head(20))
        except Exception:
            pass
        raise RuntimeError("特徴量が0列です。EXCLUDE設定が広すぎるか、数値化前処理を見直してください。")

    # LightGBM ※未変更
    params = dict(
        objective="binary",
        metric="binary_logloss",
        boosting_type="gbdt",
        random_state=RANDOM_STATE,
        verbosity=-1,
        class_weight="balanced",
        n_estimators=1000,
        num_leaves=31,
        feature_fraction=0.8,
        bagging_fraction=0.8,
        bagging_freq=1,
        min_child_samples=20,
    )
    clf = lgb.LGBMClassifier(**params)

    print("🚀 Training LightGBM...")
    clf.fit(
        X_train, y_train,
        eval_set=[(X_test, y_test)],
        eval_metric="auc",
        callbacks=[
            lgb.early_stopping(stopping_rounds=100, verbose=False),
            # lgb.log_evaluation(period=50),  # ログが欲しければ有効化
        ],
    )

    # 予測 & 評価 ※未変更
    y_pred_train = clf.predict_proba(X_train)[:, 1]
    y_pred_test  = clf.predict_proba(X_test)[:, 1]

    auc_tr = safe_auc(y_train, y_pred_train)
    auc_te = safe_auc(y_test, y_pred_test)
    if auc_tr is not None and auc_te is not None:
        print(f"📈 AUC train={auc_tr:.4f}  test={auc_te:.4f}")
    else:
        print("⚠️ AUC を計算できません(片側クラスのみの可能性)。")

    # 重要度 ※未変更
    importances = clf.feature_importances_
    feat_names  = X_train.columns.to_list()
    order = np.argsort(importances)[::-1]
    topk = min(20, len(order))
    if importances.sum() == 0:
        print("⚠️ 重要度が全て0です。片側クラス・定数特徴・早期終了直後などの可能性を確認してください。")
        print(f"   learned_iterations={getattr(clf, 'best_iteration_', None)}  y_train_unique={y_train.nunique()}")

    # ROC AUC(曲線)※未変更
    auc_curve = safe_auc(y_test, y_pred_test)
    if auc_curve is not None:
        fpr, tpr, thr = metrics.roc_curve(y_test, y_pred_test, pos_label=1)
        print("Test AUC (curve):", metrics.auc(fpr, tpr))
    else:
        print("ROC曲線はスキップ(正例/負例が片側のみ)。")

    # モデル保存 ※未変更
    if hasattr(clf, "booster_") and clf.booster_ is not None:
        ensure_dir(MODEL_DIR)
        clf.booster_.save_model(MODEL_FILE)
        print(f"💾 Saved model: {MODEL_FILE}")
    else:
        print("⚠️ モデルが未学習のため保存をスキップしました。")

解説

データの読み込み

最初に、学習用のテーブルを読み込みます。
SQLで SELECT * を実行し、列名を取得した上でPandasに変換。DataFrameとして扱えるようにすることで、その後の前処理や学習にスムーズに移行できます。

df = load_table_as_df(TABLE_NAME)

この時点で行数や列数を確認しておくと、データの規模感を把握できます。


データの分割

モデルを評価するためには、学習用とテスト用に分ける必要があります。ここではRACE_IDで時系列順に並べ、古い方を学習、新しい方をテストに割り当てています。

train_df, test_df = split_by_race_id(df)

こうすることで「未来のレースを予測する」状況に近づけています。


ラベルの作成

予測したいのは「3着以内に入るかどうか」です。そのため KAKUTEI_CHAKUJUN をもとに、3着以内なら1、それ以外なら0としたラベル列を作成しています。

df['KAKUTEI_CHAKUJUN'] = pd.to_numeric(df['KAKUTEI_CHAKUJUN'], errors='coerce').between(1, 3).astype(int)

特徴量の選定

学習に不要な列を除外したあと、残った列を数値化して特徴量とします。

  • 除外対象:ID、オッズ、ラベル由来の列など
  • 採用対象:数値に変換可能な列

数値にできる列はそのまま to_numeric で変換しますが、一見数値でない列でも正規表現で数字を抜き出し変換する工夫を入れています。これで「数字と文字が混ざった列」も学習に活かせます。


モデルの学習と評価

ここからが本番。LightGBMのパラメータを設定して学習を開始します。

  • 目的:二値分類(binary)
  • 評価指標:binary_logloss、AUC
  • サンプリング:baggingやfeature_fractionで過学習を防止
clf = lgb.LGBMClassifier(**params)
clf.fit(X_train, y_train, eval_set=[(X_test, y_test)], eval_metric="auc", callbacks=[lgb.early_stopping(stopping_rounds=100)])

学習が終わったら、AUC(精度指標)を算出します。AUCは1に近いほど精度が高いことを示します。


モデルの保存

最後に学習済みモデルをファイルとして保存します。これで次回からは再学習せずに予測だけを行えるようになります。

clf.booster_.save_model(MODEL_FILE)

ポイント

  • 学習データの整備はここまでで完結
    不要カラムを削除し、数値化し、過去5走や調教データを結合してきました。今回のコードでモデル作成まで到達し、ひとまず「学習データからモデルを作る」流れが一通り完成しました。
  • 未来を予測する形に近づける工夫
    RACE_IDで分割することで、実際の運用に近い条件で評価できるようにしています。
  • 数値化の工夫が精度に直結
    正規表現で数字を抽出するなど、少しの前処理の工夫がLightGBMの精度向上につながる部分です。
  • 再利用できる仕組み
    モデルを保存することで、予測タスクにすぐ利用できる基盤が整います。

ターミナルからの実行方法

プロジェクトのルートディレクトリで、以下のようにモジュール指定で実行します。

python -m ml.estimation

まとめ

これで、学習データの作成からLightGBMを用いたモデル学習までの流れが一通り完了しました。ここまで来れば、次は実際に新しいレースデータを入力して予測を試すことができます。

予測結果をどう評価するか、どのように馬券戦略に活かすかは今後のテーマですが、土台となる「データ → モデル」の流れは今回で完成です。

コメント