【競馬AI-5】馬ごと最新順に連番を振る理由と処理の流れ

競馬AI

競馬AIの学習データを作るとき、「この馬の最新レースはどれ? 何走前のレースを使う?」という問いは必ず出てきます。
もしレースごとに時系列を整理しておかないと、過去5走の成績を結合する処理が複雑化し、SQLやPythonの処理が一気に重くなってしまいます。
今回紹介する create_ranked_race_temp は、馬ごとに最新レース順で連番(race_row)をあらかじめ振っておくことで、その後の特徴量づくりをシンプルにしてくれる重要な準備工程です。
ここでは実際のプログラム全文を示しながら、処理の狙いとポイントをじっくり解説します。

処理概要

create_ranked_race_temp は、テーブル w_ranked_race_base をもとに新しいテーブル w_ranked_race_temp を作成します。
ここには元の全データに加えて、race_row という“馬ごと最新順の連番” が付与されます。

やっていることをひとことで言えば、

馬単位でレースを最新から順に並べ、1走目・2走目…という番号をSQLのウィンドウ関数で振り、結果をまるごと新しいテーブルに保存する

という処理です。

これを事前に材化しておくと、例えば「過去5走を横持ちにして結合」という複雑な工程を、シンプルな WHERE race_row <= 5 の条件で完結させられます。
大量データを扱うパイプラインでは、こうした
前処理のひと工夫が最終的な安定性と実行速度を左右します。


プログラム全文

以下が実際のコードです。
db/init_work_tables.py に追記する)

def create_ranked_race_temp():
    query = """
    CREATE TABLE IF NOT EXISTS w_ranked_race_temp AS
    SELECT *,
      ROW_NUMBER() OVER (
        PARTITION BY KETTO_TOROKU_BANGO
        ORDER BY RACE_ID DESC
      ) AS race_row
    FROM w_ranked_race_base;
    """

    with get_connection() as conn:
        with conn.cursor() as cursor:
            cursor.execute("DROP TABLE IF EXISTS w_ranked_race_temp")
            cursor.execute(query)
        conn.commit()

解説:一行ずつ読み解く

1. 新しいテーブルを作り直す

まず既存の w_ranked_race_tempDROPしてから再作成しています。

DROP TABLE IF EXISTS w_ranked_race_temp
CREATE TABLE IF NOT EXISTS w_ranked_race_temp AS ...

この流れを入れておくことで、何度実行しても常に最新のスナップショットを作り直すことができます。
競馬データはレース開催ごとに増え続けますから、テーブルを再構築できることは重要です。


2. ROW_NUMBER() で馬ごとに連番

ROW_NUMBER() はSQLのウィンドウ関数。
ここでは KETTO_TOROKU_BANGO(血統登録番号)ごとにグループ化し、RACE_IDを降順に並べてから 1,2,3…と連番を振ります。

ROW_NUMBER() OVER (
  PARTITION BY KETTO_TOROKU_BANGO
  ORDER BY RACE_ID DESC
) AS race_row
  • race_row = 1 → その馬の最新レース
  • race_row = 2 → 1走前
  • …以下同様

この情報さえあれば「最新レースだけほしい」「過去3走分ほしい」といった処理は WHERE race_row <= 3 で済みます。
過去走の横持ち結合のような本来複雑な処理をシンプルに変える鍵がここにあります。


3. get_connection() の役割

with get_connection() as conn: は共通のDB接続関数を使っています。
pymysql の DictCursor を利用している場合、row["RACE_ID"] のように列名でアクセスできるので、後工程でも扱いやすいデータ形式が保たれます。

また with 構文によって接続とカーソルは自動的にクローズされ、長時間実行でも接続リークが起きません。
これは大規模データを扱うバッチ処理では必須の安全策です。


さらに掘り下げ:なぜ事前に材化するのか

A. 後続処理をシンプルにする

この race_row がない場合、後で過去5走分を取得するには

  • 複雑な自己結合(自己JOIN)
  • サブクエリや相関サブクエリ
    などを組み合わせる必要があります。
    大量データを抱える中央競馬の全レースでそれをやると、SQLサーバーの負荷は一気に上がり、処理時間も読めなくなります。

B. 処理を安定化する

一度テーブルとして固定しておくことで、その時点の完全なスナップショットを得られます。
以降のステップで元データが更新されても影響を受けず、処理が途中で失敗しても再実行が容易です。

C. 再利用しやすい

w_ranked_race_temp は単なる一時テーブルに見えますが、他の分析や特徴量作成にも利用可能。
「最新走」「直近3走平均タイム」「過去5走平均人気」など多くの指標の土台になります。


実務での活用ポイント

  • SELECT * ではなく必要カラムに絞る最適化
    実運用では、今後使うカラムだけを明示的に選んでおくと、
    ディスク使用量とI/O負荷が抑えられます。
  • インデックスの検討
    後段で KETTO_TOROKU_BANGORACE_ID をよく使うなら、複合インデックスを張ると検索がより高速になります。
  • スケジュール実行
    レース後の定期処理としてcronやワークフロー管理ツール(Airflowなど)に組み込めば、常に最新状態を自動で保てます。
  • Python側での拡張
    race_row の番号を使って直近の走行成績をpandasで集約し、機械学習の特徴量にするなど応用範囲は広いです。

コードの呼び出し方

この関数は前回のコードと同じくinit_db.py から実行します。
複数の作業テーブルをまとめて作成する流れの中で create_ranked_race_temp() が呼ばれます。以下のようなコードになります。

from db.init_work_tables import (
    create_ranked_race_base,
    create_ranked_race_temp
)

if __name__ == "__main__":
    print("🛠️ ranked_race_base を作成中...")
    create_ranked_race_base()

    print("🛠️ ranked_race_temp を作成中...")
    create_ranked_race_temp()

    print("✅ すべて完了しました。")

まとめ

create_ranked_race_temp は、馬ごと最新順に連番(race_row)を振ったテーブルを事前に作成する準備工程です。
この一手間で、次のようなメリットがあります。

  • 過去5走の横持ち結合など後続処理を安定化・単純化できる
  • 元データ更新の影響を受けない安全なスナップショットを確保できる
  • 直近走データの集計や特徴量づくりの再利用が容易になる

競馬AIの学習パイプラインでは、こうした前処理テーブルをうまく設計することが、
精度を高めながら運用を安定させる鍵です。
create_ranked_race_temp はその典型例であり、
「シンプルな処理で未来の複雑さを減らす」というデータエンジニアリングの基本を体現しています。

コメント