【競馬AI-6】過去5走を横持ちにして学習データを整える

競馬AI

競馬AIを作るときに欠かせないのが「過去のレース成績をどのように扱うか」という視点です。1走前だけを見るのでは足りず、直近の5走分をまとめて特徴量にしたい場面は多いでしょう。そこで今回は、全ての馬のレース結果に対して「過去5走を横持ちで持たせる処理」を紹介します。

処理概要

今回の処理では、まず全ての馬についてレースを最新から順に並べることから始めます。そこから直近の1走前〜5走前までの情報を取り出し、同じ行に横持ちで追加していきます。

結果として「ある馬のこのレース」という1行に、その馬の直近5走分の情報が付与されます。これを作っておけば、学習モデルに過去成績も含めて入力することが簡単になります。

プログラム全文

以下が実際のコードです。
db/init_work_tables.py に追記する)

def create_joined_race_table():
    # カラムとMySQL型を明示的に定義
    merge_columns_with_types = {
        "RACE_ID": "BIGINT(20)",
        "KAKUTEI_CHAKUJUN": "CHAR(2)",
        "TANSHO_ODDS_REAL": "DECIMAL(5,1)",
        "WAKUBAN": "CHAR(1)",
        "UMABAN": "CHAR(2)",
        "FUTAN_JURYO": "CHAR(3)",
        "BLINKER_SHIYO_KUBUN": "CHAR(2)",
        "KISHU_CODE": "CHAR(5)",
        "BATAIJU": "CHAR(3)",
        "ZOGEN_FUGO": "CHAR(1)",
        "ZOGEN_SA": "CHAR(3)",
        "SOHA_TIME": "VARCHAR(32)",
        "CHAKUSA_CODE1": "CHAR(3)",
        "CORNER1_JUNI": "CHAR(2)",
        "CORNER2_JUNI": "CHAR(2)",
        "CORNER3_JUNI": "CHAR(2)",
        "CORNER4_JUNI": "CHAR(2)",
        "TANSHO_NINKIJUN": "CHAR(2)",
        "KOHAN_4F": "CHAR(3)",
        "KOHAN_3F": "CHAR(3)",
        "TIME_SA": "CHAR(4)",
        "KYAKUSHITSU_HANTEI": "CHAR(1)"
    }

    merge_columns = list(merge_columns_with_types.keys())

    print("🚀 データ取得開始...")
    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT * FROM w_ranked_race_temp ORDER BY RACE_ID DESC")
            rows = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            df = pd.DataFrame(rows, columns=columns)
            print(f"✅ {len(df)} 件のデータを取得しました。")

            print("🧮 race_row カラム生成中...")
            df["race_row"] = df.groupby("KETTO_TOROKU_BANGO").cumcount() + 1

            print("📊 ソート実行中...")
            df.sort_values(by=["KETTO_TOROKU_BANGO", "RACE_ID"], ascending=[True, False], inplace=True)

            print("🔁 過去5走データの付与を開始...")
            for i in range(1, 6):
                print(f" ➡️ {i}/5 回目の shift 実行中...", end="\r")
                shifted = df.groupby("KETTO_TOROKU_BANGO", group_keys=False)[merge_columns].shift(-i)
                shifted.columns = [f"PREV{i}_{col}" for col in merge_columns]
                df = pd.concat([df, shifted], axis=1)
            print("\n✅ 過去5走の結合完了。")

        with conn.cursor() as cursor:
            print("🧹 w_joined_race を削除中...")
            cursor.execute("DROP TABLE IF EXISTS w_joined_race")
            cursor.execute("CREATE TABLE w_joined_race LIKE w_ranked_race_temp")

            print("🛠 カラム追加(ALTER TABLE)中...")
            alter_statements = []
            for i in range(1, 6):
                for col, mysql_type in merge_columns_with_types.items():
                    col_name = f"PREV{i}_{col}"
                    alter_statements.append(f"ADD COLUMN {col_name} {mysql_type}")

            if alter_statements:
                alter_sql = f"ALTER TABLE w_joined_race {', '.join(alter_statements)}"
                cursor.execute(alter_sql)
            conn.commit()
            print("✅ カラム追加完了")

            insert_columns = df.columns.tolist()
            insert_sql = f"""
                INSERT INTO w_joined_race ({','.join(insert_columns)})
                VALUES ({','.join(['%s'] * len(insert_columns))})
            """

            print("🧹 NaN → None 変換中...")
            df = df.where(pd.notnull(df), None)

            records = df.to_records(index=False)
            total = len(records)
            batch_size = 1000
            total_batches = (total + batch_size - 1) // batch_size

            print(f"📤 データ挿入開始:{total} 件をバッチサイズ {batch_size} で挿入します。")
            for i in range(0, total, batch_size):
                batch_index = i // batch_size + 1
                batch = [tuple(r) for r in records[i:i + batch_size]]
                cursor.executemany(insert_sql, batch)
                conn.commit()
                print(f"📦 バッチ {batch_index} / {total_batches} を挿入中...", end="\r")
            print(f"\n✅ 全 {total} 行を w_joined_race に挿入完了。")

解説

この処理の肝は 「過去の走行データをシフトして横持ち化する」 部分です。

まず、race_row というカラムで「この馬にとって何走前なのか」を順番に番号付けします。その上で、pandasの groupby + shift を使って、1走前から5走前までをそれぞれの行に追加します。

例えばある馬が「2025年春天」を走った場合、その行には「2025年大阪杯」「2025年京都記念」といった過去の履歴が PREV1_〜 の形で横持ちされます。

これがあるだけで「過去5走を特徴量に入れる」という処理が一気に楽になります。

ポイント

  • SQLだけでやらない
    自己結合で過去5走を取ろうとするとSQLが非常に重くなります。pandasの shift を使う方がシンプルで安定します。
  • カラム定義を明示する
    PREV1〜PREV5のカラムはALTER TABLEで追加。型を揃えておくことで後工程のミスを減らせます
  • バッチ挿入で安定化
    1,000件ずつ分割して挿入。大量データを一気に処理せず、安定したジョブ実行が可能になります。

コードの呼び出し方

この関数は前回のコードと同じくinit_db.py から実行します。
複数の作業テーブルをまとめて作成する流れの中で create_ranked_race_temp() が呼ばれます。以下のようなコードになります。

from db.init_work_tables import (
    create_ranked_race_base,
    create_ranked_race_temp,
    create_joined_race_table
)

if __name__ == "__main__":
    print("🛠️ ranked_race_base を作成中...")
    create_ranked_race_base()

    print("🛠️ ranked_race_temp を作成中...")
    create_ranked_race_temp()

    print("🛠️ joined_race を作成中...")
    create_joined_race_table()

    print("✅ すべて完了しました。")

まとめ

過去5走の横持ち化は、競馬AIにとって必須の前処理です。これを用意しておけば、学習時に「過去成績を含める」という処理がとてもスムーズになります。

  • 馬ごとに最新順を並べて連番を付与
  • shiftで過去5走を横持ちに変換
  • バッチ処理で安全にDBへ投入

こうした工夫の積み重ねが、大規模データを扱う競馬AI開発の安定性と精度を高めるのです。

コメント