【競馬AI-4】学習用データを抽出する

競馬AI

出走データを学習用に整形するには、ただSQLで一括処理するだけでは不安定です。
ここでは、Pythonでデータを小分けに取得・加工して挿入する関数 create_ranked_race_base() を紹介し、コード全文からプロジェクト構成までわかりやすく解説します。

出走情報を学習用に抽出する

競馬AIを作るには、まず学習用にきれいなテーブルを用意することが欠かせません。
今回紹介する create_ranked_race_base() は、2015年以降の中央競馬の出走情報(umagoto_race_joho)から必要なデータだけを取り出し、学習しやすい形に整形して別テーブル w_ranked_race_base を作る処理です。

ポイントはSQLだけに処理を任せないこと
データ量が数百万行規模になるため、一気に INSERT ... SELECT するとDBが固まったりメモリを食いつぶしたりします。
そこで Pythonでデータを小分けに運び、必要な加工をその場で加えてから挿入する方法を取っています。


プログラム全文

以下が実際のコードです。
db/init_work_tables.py に配置する想定)

def create_ranked_race_base():
    create_table_sql = """
    CREATE TABLE IF NOT EXISTS w_ranked_race_base (
        LIKE umagoto_race_joho
    );
    """
    alter_table_sql = """
    ALTER TABLE w_ranked_race_base
      ADD COLUMN RACE_ID BIGINT,
      ADD COLUMN TANSHO_ODDS_REAL DECIMAL(5,1),
      ADD INDEX idx_race_id (RACE_ID);
    """

    select_sql_template = """
    SELECT *,
      CAST(CONCAT(KAISAI_NEN, LPAD(KEIBAJO_CODE,2,'0'), KAISAI_KAIJI, KAISAI_NICHIJI, LPAD(RACE_BANGO,2,'0')) AS UNSIGNED) AS RACE_ID,
      CAST(SUBSTRING(TANSHO_ODDS, 1, CHAR_LENGTH(TANSHO_ODDS) - 1) AS UNSIGNED) + 
        CAST(CONCAT('0.', SUBSTRING(TANSHO_ODDS, -1)) AS DECIMAL(5,1)) AS TANSHO_ODDS_REAL
    FROM umagoto_race_joho
    WHERE KAISAI_NEN >= 2015
      AND KEIBAJO_CODE IN ('01','02','03','04','05','06','07','08','09','10')
    LIMIT {limit} OFFSET {offset};
    """

    insert_sql = """
    INSERT INTO w_ranked_race_base (
        INSERT_TIMESTAMP, UPDATE_TIMESTAMP, RECORD_SHUBETSU_ID, DATA_KUBUN, DATA_SAKUSEI_NENGAPPI,
        RACE_CODE, KAISAI_NEN, KAISAI_GAPPI, KEIBAJO_CODE, KAISAI_KAIJI, KAISAI_NICHIJI,
        RACE_BANGO, WAKUBAN, UMABAN, KETTO_TOROKU_BANGO, BAMEI, UMAKIGO_CODE, SEIBETSU_CODE,
        HINSHU_CODE, MOSHOKU_CODE, BAREI, TOZAI_SHOZOKU_CODE, CHOKYOSHI_CODE, CHOKYOSHIMEI_RYAKUSHO,
        BANUSHI_CODE, BANUSHIMEI_HOJINKAKU_NASHI, FUKUSHOKU_HYOJI, FUTAN_JURYO, HENKOMAE_FUTAN_JURYO,
        BLINKER_SHIYO_KUBUN, KISHU_CODE, HENKOMAE_KISHU_CODE, KISHUMEI_RYAKUSHO,
        HENKOMAE_KISHUMEI_RYAKUSHO, KISHU_MINARAI_CODE, HENKOMAE_KISHU_MINARAI_CODE, BATAIJU,
        ZOGEN_FUGO, ZOGEN_SA, IJO_KUBUN_CODE, NYUSEN_JUNI, KAKUTEI_CHAKUJUN, DOCHAKU_KUBUN,
        DOCHAKU_TOSU, SOHA_TIME, CHAKUSA_CODE1, CHAKUSA_CODE2, CHAKUSA_CODE3, CORNER1_JUNI,
        CORNER2_JUNI, CORNER3_JUNI, CORNER4_JUNI, TANSHO_ODDS, TANSHO_NINKIJUN, KAKUTOKU_HONSHOKIN,
        KAKUTOKU_FUKASHOKIN, KOHAN_4F, KOHAN_3F, AITE1_KETTO_TOROKU_BANGO, AITE1_BAMEI,
        AITE2_KETTO_TOROKU_BANGO, AITE2_BAMEI, AITE3_KETTO_TOROKU_BANGO, AITE3_BAMEI, TIME_SA,
        RECORD_KOSHIN_KUBUN, MINING_KUBUN, MINING_YOSO_SOHA_TIME, MINING_YOSO_GOSA_PLUS,
        MINING_YOSO_GOSA_MINUS, MINING_YOSO_JUNI, KYAKUSHITSU_HANTEI, RACE_ID, TANSHO_ODDS_REAL
    )
    VALUES (
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
    );
    """

    with get_connection() as conn:
        with conn.cursor() as cursor:
            print("🧹 w_ranked_race_base を削除中...")
            cursor.execute("DROP TABLE IF EXISTS w_ranked_race_base")
            print("🧱 テーブル構造作成中...")
            cursor.execute(create_table_sql)
            cursor.execute(alter_table_sql)

            print("📥 分割挿入開始...")
            limit = 10000
            offset = 0

            while True:
                select_sql = select_sql_template.format(limit=limit, offset=offset)
                cursor.execute(select_sql)
                rows = cursor.fetchall()

                if not rows:
                    break

                insert_data = [
                    (
                        row["INSERT_TIMESTAMP"], row["UPDATE_TIMESTAMP"], row["RECORD_SHUBETSU_ID"], row["DATA_KUBUN"],
                        row["DATA_SAKUSEI_NENGAPPI"], row["RACE_CODE"], row["KAISAI_NEN"], row["KAISAI_GAPPI"],
                        row["KEIBAJO_CODE"], row["KAISAI_KAIJI"], row["KAISAI_NICHIJI"], row["RACE_BANGO"],
                        row["WAKUBAN"], row["UMABAN"], row["KETTO_TOROKU_BANGO"], row["BAMEI"], row["UMAKIGO_CODE"],
                        row["SEIBETSU_CODE"], row["HINSHU_CODE"], row["MOSHOKU_CODE"], row["BAREI"], row["TOZAI_SHOZOKU_CODE"],
                        row["CHOKYOSHI_CODE"], row["CHOKYOSHIMEI_RYAKUSHO"], row["BANUSHI_CODE"],
                        row["BANUSHIMEI_HOJINKAKU_NASHI"], row["FUKUSHOKU_HYOJI"], row["FUTAN_JURYO"],
                        row["HENKOMAE_FUTAN_JURYO"], row["BLINKER_SHIYO_KUBUN"], row["KISHU_CODE"],
                        row["HENKOMAE_KISHU_CODE"], row["KISHUMEI_RYAKUSHO"], row["HENKOMAE_KISHUMEI_RYAKUSHO"],
                        row["KISHU_MINARAI_CODE"], row["HENKOMAE_KISHU_MINARAI_CODE"], row["BATAIJU"],
                        row["ZOGEN_FUGO"], row["ZOGEN_SA"], row["IJO_KUBUN_CODE"], row["NYUSEN_JUNI"],
                        row["KAKUTEI_CHAKUJUN"], row["DOCHAKU_KUBUN"], row["DOCHAKU_TOSU"], row["SOHA_TIME"],
                        row["CHAKUSA_CODE1"], row["CHAKUSA_CODE2"], row["CHAKUSA_CODE3"], row["CORNER1_JUNI"],
                        row["CORNER2_JUNI"], row["CORNER3_JUNI"], row["CORNER4_JUNI"], row["TANSHO_ODDS"],
                        row["TANSHO_NINKIJUN"], row["KAKUTOKU_HONSHOKIN"], row["KAKUTOKU_FUKASHOKIN"],
                        row["KOHAN_4F"], row["KOHAN_3F"], row["AITE1_KETTO_TOROKU_BANGO"], row["AITE1_BAMEI"],
                        row["AITE2_KETTO_TOROKU_BANGO"], row["AITE2_BAMEI"], row["AITE3_KETTO_TOROKU_BANGO"],
                        row["AITE3_BAMEI"], row["TIME_SA"], row["RECORD_KOSHIN_KUBUN"], row["MINING_KUBUN"],
                        row["MINING_YOSO_SOHA_TIME"], row["MINING_YOSO_GOSA_PLUS"], row["MINING_YOSO_GOSA_MINUS"],
                        row["MINING_YOSO_JUNI"], row["KYAKUSHITSU_HANTEI"],
                        row["RACE_ID"], row["TANSHO_ODDS_REAL"]
                    )
                    for row in rows
                ]


                cursor.executemany(insert_sql, insert_data)
                conn.commit()

                print(f"✅ {len(rows)} 行挿入 (OFFSET {offset})")
                offset += limit

コード解説

テーブル作成と追加列

  • CREATE TABLE ... LIKE で元テーブル umagoto_race_joho と同じ構造をそのままコピー。
  • さらに ALTER TABLE で学習用に必要な列を追加します。
    • RACE_ID:開催年+競馬場コード+回次+日次+レース番号をまとめた一意のID。
    • TANSHO_ODDS_REAL:文字列だった単勝オッズを小数点付きの数値に正規化。

データ取得

  • LIMITOFFSET1万件ずつ 取り出すのがポイント。
  • CASTSUBSTRING で、レースIDやオッズの型変換はここで完結。
    これ以上の集約や結合はせず、SQL側を軽く保っています。

データ挿入

  • executemany() で一括挿入し、毎バッチ commit()
  • 進捗を print() で出力することで長時間処理でも状況が見えます。

ポイント紹介(設計の肝)

  • SQL側に重い処理を背負わせない
    巨大な INSERT ... SELECT は一見スマートでも、TEMP領域やロックで落ちやすい。
  • Python側で小分けに運ぶ
    失敗時のリトライや途中確認がしやすく、長時間ジョブに向く。
  • バッチサイズは調整可能
    1万件は目安。環境によって3000〜20000件程度でチューニングすると良いです。
  • インデックスのタイミング
    まとめて入れる場合は後付けが高速。今回は読み取り前提で先付けしています。

コードの呼び出し方

この関数はプロジェクトの初期化スクリプト init_db.py から実行します。
複数の作業テーブルをまとめて作成する流れの中で create_ranked_race_base() が呼ばれます。
※次の記事からも別の関数を作成しますが、init_db.pyに呼び出しを追記していきます。

from db.init_work_tables import (
    create_ranked_race_base
)

if __name__ == "__main__":
    print("🛠️ ranked_race_base を作成中...")
    create_ranked_race_base()

    print("✅ すべて完了しました。")

DB接続は db/connection.py で管理。
例えば以下のように get_connection() を定義します。

import pymysql
from pymysql.cursors import DictCursor

def get_connection():
    return pymysql.connect(
        host="localhost",
        user="user",
        password="passwd",
        database="mykeiba",
        charset="utf8mb4",
        cursorclass=DictCursor,
        autocommit=False
    )

ターミナルからの実行方法

プロジェクトのルートディレクトリで、以下のようにモジュール指定で実行します。

python -m services.init_db

この -m オプションは「パッケージとしてモジュールを実行する」ための指定です。
services がパッケージ(__init__.py があるディレクトリ)として認識され、
その中の init_db.py がメインスクリプトとして起動されます。

  • 実行中は 🛠️ ranked_race_base を作成中... の進捗メッセージが順に表示されます。
  • すべての処理が終わると ✅ すべて完了しました。 が表示されます。

※通常の python init_db.py ではなく python -m services.init_db としているのは、
プロジェクトをパッケージ単位で管理し、相対インポートを安全に使うためです。


プロジェクト全体のフォルダ構成

シンプルにまとめると次のようなイメージです。

project-root/
├─ db/
│  ├─ connection.py            # get_connection()を定義
│  └─ init_work_tables.py      # create_ranked_race_base など複数関数を管理
├─ ml/
│  └─ estimation.py            # モデル作成用のコード
├─ services/
│  └─ init_db.py               # ← 本記事の主役
├─ model/
│  └─ ... 学習済みモデルや特徴量設計ファイル
└─ logs/
   └─ ... 実行ログやエラーログ
  • db:DB接続やテーブル作成系の関数をまとめる場所
  • services:データ加工や特徴量作成などのロジック
  • init_db.py:初期化の実行ポイント。cronやシェルから呼び出すのもここ。

まとめ

  • 2015年以降の中央競馬出走データを学習向けに整形するテーブルw_ranked_race_base
  • SQLの一括処理では重くなるため、Pythonで小分けに取得・挿入することで安定&高速化。
  • 接続ファイル、呼び出しスクリプト、フォルダ構成まで揃えておけば、今後の特徴量設計やモデル学習もスムーズ。

この形をベースに、次は「過去5走の結合」や「特徴量の自動生成」など、さらにAIに寄せた加工へ進めていけます。
大規模データを扱うときの基本パターンとして、ぜひプロジェクトに取り入れてみてください。

コメント