【競馬AI 11】日付を指定するだけ!予測対象レース情報を自動取得する仕組み

競馬AI

AIで競馬予測を行うとき、「今日のレース情報だけを抽出する」必要があります。
この記事では、日付を指定するだけで、その日のレースデータを自動取得できるコードを紹介します。
すでに学習データの整形や結合を終えた方なら、これまで作成したコードを少し修正するだけで、1日だけのレース情報を抽出することができます。

まだ学習データの作成が完了していない方はこの記事から順番に実施してください。

修正点

学習データ作成コードをコピペする

init_work_tables.pyなどに学習データを作成するコードがあると思います。それをすべてコピペして新規ファイルを作成してください。

それに日付指定して多少修正するだけで完了します。

私はファイル名はpredict_work_tables.pyにしました。

データをインサートするテーブル名を変更する

インサートする先のテーブルはすべて「w_」という接頭辞がついていると思います。これを任意の文字に変更します。そうしないとせっかく作成した学習用データが上書きされてしまいます。

私は「p_」と変更しました。

create_ranked_race_baseの引数に年月日を追加する

最初の関数である「create_ranked_race_base」に年月日を渡すように修正します。

クエリにその年月日でレース情報を取得するようにすれば、その日だけのデータが作成されます。

以下は修正点です。該当行を探して修正してください。

def create_ranked_race_base(year: str, date: str):
…
WHERE KAISAI_NEN = {year} AND KAISAI_GAPPI = {date}
…
year = year	
date = date	
while True:	while True:
select_sql = select_sql_template.format(limit=limit, offset=offset, year=year, date=date)

create_joined_race_tableの修正

この関数は過去の直近5走を持ってきて結合する処理でしたね。ここは少し多めの修正が入っているので、コードをそのまま載せます。

主な違いは「クエリを軽くするために5走のデータを2022年以降から取得するようにした」ことです。それ以外の修正は、何故かエラーが出たのでそれを潰していった結果ですw

def create_joined_race_table():
    # 付与する「過去5走」列とMySQL型(ご提示の定義をそのまま使用)
    merge_columns_with_types = {
        "RACE_ID": "BIGINT(20)",
        "KAKUTEI_CHAKUJUN": "CHAR(2)",
        "TANSHO_ODDS_REAL": "DECIMAL(5,1)",
        "WAKUBAN": "CHAR(1)",
        "UMABAN": "CHAR(2)",
        "FUTAN_JURYO": "CHAR(3)",
        "BLINKER_SHIYO_KUBUN": "CHAR(2)",
        "KISHU_CODE": "CHAR(5)",
        "BATAIJU": "CHAR(3)",
        "ZOGEN_FUGO": "CHAR(1)",
        "ZOGEN_SA": "CHAR(3)",
        "SOHA_TIME": "VARCHAR(32)",
        "CHAKUSA_CODE1": "CHAR(3)",
        "CORNER1_JUNI": "CHAR(2)",
        "CORNER2_JUNI": "CHAR(2)",
        "CORNER3_JUNI": "CHAR(2)",
        "CORNER4_JUNI": "CHAR(2)",
        "TANSHO_NINKIJUN": "CHAR(2)",
        "KOHAN_4F": "CHAR(3)",
        "KOHAN_3F": "CHAR(3)",
        "TIME_SA": "CHAR(4)",
        "KYAKUSHITSU_HANTEI": "CHAR(1)"
    }
    merge_columns = list(merge_columns_with_types.keys())

    print("🚀 データ取得開始...")
    with get_connection() as conn:
        with conn.cursor() as cursor:
            # ---- 履歴側(w_):近3年、必要カラムのみ ----
            sel_cols = ["RACE_ID", "KETTO_TOROKU_BANGO"] + [c for c in merge_columns if c not in ("RACE_ID",)]
            col_sql = ", ".join(f"`{c}`" for c in sel_cols)
            cursor.execute(
                f"""
                SELECT {col_sql}
                FROM w_ranked_race_temp
                WHERE KAISAI_NEN >= %s
                ORDER BY RACE_ID DESC
                """,
                (2022,)
            )
            w_rows = cursor.fetchall()
            w_df = pd.DataFrame(w_rows)  # DictCursor 前提

            # ---- ベース側(p_):対象日レースを全カラム ----
            cursor.execute("SELECT * FROM p_ranked_race_temp ORDER BY RACE_ID DESC")
            p_rows = cursor.fetchall()
            p_df = pd.DataFrame(p_rows)

    print(f"✅ w側: {len(w_df)}件 / p側: {len(p_df)}件 を取得")

    # 型整形(比較用)
    for _df in (w_df, p_df):
        if "RACE_ID" in _df.columns:
            _df["RACE_ID"] = pd.to_numeric(_df["RACE_ID"], errors="coerce")

    # 履歴を馬ごとに降順で保持
    w_df.sort_values(["KETTO_TOROKU_BANGO", "RACE_ID"], ascending=[True, False], inplace=True)
    hist_by_horse = {k: g.reset_index(drop=True) for k, g in w_df.groupby("KETTO_TOROKU_BANGO", sort=False)}

    # ===== p側行ごとに過去5走を連結 =====
    print("🔁 過去5走データの付与を開始...")
    records = []
    for idx, base in p_df.iterrows():
        horse = base.get("KETTO_TOROKU_BANGO")
        rid   = base.get("RACE_ID")

        # ★ NaNを作らない:初期値は None
        prev_block = {f"PREV{i}_{col}": None for i in range(1, 6) for col in merge_columns}

        if pd.notna(horse) and pd.notna(rid):
            hist = hist_by_horse.get(horse)
            if hist is not None and not hist.empty:
                prev = hist[hist["RACE_ID"] < rid]
                if not prev.empty:
                    top5 = prev.head(5)
                    for i, (_, r) in enumerate(top5.iterrows(), start=1):
                        for col in merge_columns:
                            val = r[col] if col in r.index else None
                            # pandasのNA/NaNは None に落とす(ここでは念のため)
                            if pd.isna(val):
                                val = None
                            prev_block[f"PREV{i}_{col}"] = val

        out = base.to_dict()
        out.update(prev_block)
        records.append(out)
        if (idx + 1) % 1000 == 0:
            print(f" ➡️ {idx + 1}/{len(p_df)} 行処理", end="\r")

    result_df = pd.DataFrame(records)
    print("\n✅ 過去5走の連結完了。")

    # ===== DBへ保存(p_joined_raceを作り直し)=====
    with get_connection() as conn:
        with conn.cursor() as cursor:
            print("🧹 p_joined_race を削除中...")
            cursor.execute("DROP TABLE IF EXISTS p_joined_race")
            cursor.execute("CREATE TABLE p_joined_race LIKE p_ranked_race_temp")

            print("🛠 PREVカラム追加(ALTER TABLE)中...")
            alter_parts = []
            for i in range(1, 6):
                for col, typ in merge_columns_with_types.items():
                    alter_parts.append(f"ADD COLUMN `PREV{i}_{col}` {typ}")
            if alter_parts:
                cursor.execute(f"ALTER TABLE p_joined_race {', '.join(alter_parts)}")
            conn.commit()
            print("✅ カラム追加完了")

            # テーブル列順を取得して合わせる
            cursor.execute("SHOW COLUMNS FROM p_joined_race")
            target_cols = [row["Field"] for row in cursor.fetchall()]

            # --- ★ NaN/NaT/inf を完全除去して None へ、かつ object 化 ---
            result_df = result_df.reindex(columns=target_cols)
            result_df = result_df.astype(object) \
                                 .where(pd.notnull(result_df), None) \
                                 .replace({np.inf: None, -np.inf: None})

            # 挿入
            placeholders = ", ".join(["%s"] * len(target_cols))
            insert_sql = f"INSERT INTO p_joined_race ({', '.join(f'`{c}`' for c in target_cols)}) VALUES ({placeholders})"

            recs = [tuple(row) for row in result_df.itertuples(index=False, name=None)]
            total = len(recs)
            batch = 1000
            total_batches = (total + batch - 1) // batch

            print(f"📤 データ挿入開始:{total} 件をバッチ {batch} 件で投入します。")
            for i in range(0, total, batch):
                cursor.executemany(insert_sql, recs[i:i+batch])
                conn.commit()
                print(f"📦 バッチ {i//batch + 1}/{total_batches} 挿入中...", end="\r")
            print(f"\n🎉 全 {total} 行を p_joined_race に挿入完了。")

enrich_w_joined_race_bulkの修正

次のこの関数の修正です。もしかしたら修正しなくても問題ないかもしれませんが、私の環境ではなぜかエラーが出たので修正しました。

修正後のほうが安全なのかもしれませんが、学習データ作成用のコードも動いているのでそのままにしています。

こちらもそのまま載せるので差分をご自身で確認してみてください。

def enrich_w_joined_race_bulk():

    wood_columns_with_types = {
        "TRACEN_KUBUN": "CHAR(1)",
        "CHOKYO_NENGAPPI": "CHAR(8)",
        "CHOKYO_JIKOKU": "CHAR(4)",
        "COURSE": "CHAR(1)",
        "BABAMAWARI": "CHAR(1)",
        "YOBI": "CHAR(1)",
        "TIME_GOKEI_10FURLONG": "CHAR(4)",
        "LAPTIME_10FURLONG": "CHAR(3)",
        "TIME_GOKEI_9FURLONG": "CHAR(4)",
        "LAPTIME_9FURLONG": "CHAR(3)",
        "TIME_GOKEI_8FURLONG": "CHAR(4)",
        "LAPTIME_8FURLONG": "CHAR(3)",
        "TIME_GOKEI_7FURLONG": "CHAR(4)",
        "LAPTIME_7FURLONG": "CHAR(3)",
        "TIME_GOKEI_6FURLONG": "CHAR(4)",
        "LAPTIME_6FURLONG": "CHAR(3)",
        "TIME_GOKEI_5FURLONG": "CHAR(4)",
        "LAPTIME_5FURLONG": "CHAR(3)",
        "TIME_GOKEI_4FURLONG": "CHAR(4)",
        "LAPTIME_4FURLONG": "CHAR(3)",
        "TIME_GOKEI_3FURLONG": "CHAR(4)",
        "LAPTIME_3FURLONG": "CHAR(3)",
        "TIME_GOKEI_2FURLONG": "CHAR(4)",
        "LAPTIME_2FURLONG": "CHAR(3)",
        "LAPTIME_1FURLONG": "CHAR(3)"
    }

    hanro_columns_with_types = {
        "TRACEN_KUBUN": "CHAR(1)",
        "CHOKYO_NENGAPPI": "CHAR(8)",
        "CHOKYO_JIKOKU": "CHAR(4)",
        "TIME_GOKEI_4FURLONG": "CHAR(4)",
        "LAP_TIME_4FURLONG": "CHAR(3)",
        "TIME_GOKEI_3FURLONG": "CHAR(4)",
        "LAP_TIME_3FURLONG": "CHAR(3)",
        "TIME_GOKEI_2FURLONG": "CHAR(4)",
        "LAP_TIME_2FURLONG": "CHAR(3)",
        "LAP_TIME_1FURLONG": "CHAR(3)"
    }

    with get_connection() as conn:
        cursor = conn.cursor()
        print("📋 データ取得中...")
        cursor.execute("SELECT * FROM p_joined_race")
        df = pd.DataFrame(cursor.fetchall(), columns=[desc[0] for desc in cursor.description])
        print(f"✅ p_joined_race: {len(df)}件")

        cursor.execute("SELECT * FROM woodchip_chokyo WHERE CHOKYO_NENGAPPI >= '20250801'")
        df_wood = pd.DataFrame(cursor.fetchall(), columns=[desc[0] for desc in cursor.description])

        cursor.execute("SELECT * FROM hanro_chokyo WHERE CHOKYO_NENGAPPI >= '20250801'")
        df_hanro = pd.DataFrame(cursor.fetchall(), columns=[desc[0] for desc in cursor.description])
        print(f"✅ woodchip_chokyo: {len(df_wood)}件, hanro_chokyo: {len(df_hanro)}件")

    # 日付変換
    df["RACE_DATE"] = pd.to_datetime(df["KAISAI_NEN"].astype(str) + df["KAISAI_GAPPI"].astype(str), format="%Y%m%d", errors="coerce")
    df_wood["CHOKYO_NENGAPPI_DT"] = pd.to_datetime(df_wood["CHOKYO_NENGAPPI"], format="%Y%m%d", errors="coerce")
    df_hanro["CHOKYO_NENGAPPI_DT"] = pd.to_datetime(df_hanro["CHOKYO_NENGAPPI"], format="%Y%m%d", errors="coerce")

    # 木チップ追い切りデータを結合
    print("🔗 woodchip_chokyoを結合中...")
    df_wood = df_wood.sort_values("CHOKYO_NENGAPPI_DT")
    df = df.sort_values("RACE_DATE")
    
    df = pd.merge_asof(
        df,
        df_wood[["KETTO_TOROKU_BANGO", "CHOKYO_NENGAPPI_DT"] + list(wood_columns_with_types.keys())],
        by="KETTO_TOROKU_BANGO",
        left_on="RACE_DATE",
        right_on="CHOKYO_NENGAPPI_DT",
        direction="backward"
    )

    # 坂路追い切りデータを結合(suffixがつく)
    print("🔗 hanro_chokyoを結合中...")
    df_hanro = df_hanro.sort_values("CHOKYO_NENGAPPI_DT")
    df = pd.merge_asof(
        df,
        df_hanro[["KETTO_TOROKU_BANGO", "CHOKYO_NENGAPPI_DT"] + list(hanro_columns_with_types.keys())],
        by="KETTO_TOROKU_BANGO",
        left_on="RACE_DATE",
        right_on="CHOKYO_NENGAPPI_DT",
        direction="backward",
        suffixes=('', '_HANRO')
    )

    # カラム名リネーム(WOOD_)
    for col in wood_columns_with_types:
        if col in df.columns:
            df.rename(columns={col: f"WOOD_{col}"}, inplace=True)

    # カラム名リネーム(_HANRO → HANRO_)
    for col in hanro_columns_with_types:
        if f"{col}_HANRO" in df.columns:
            df.rename(columns={f"{col}_HANRO": f"HANRO_{col}"}, inplace=True)
        elif col in df.columns:
            df.rename(columns={col: f"HANRO_{col}"}, inplace=True)

    # テーブル作成
    with get_connection() as conn:
        cursor = conn.cursor()
        print("🆕 新テーブル作成中...")
        cursor.execute("DROP TABLE IF EXISTS p_joined_race_chokyo")
        cursor.execute("CREATE TABLE p_joined_race_chokyo LIKE p_joined_race")

        for col, col_type in wood_columns_with_types.items():
            try:
                cursor.execute(f"ALTER TABLE p_joined_race_chokyo ADD COLUMN `WOOD_{col}` {col_type}")
            except Exception as e:
                print(f"⚠️ カラム追加失敗: WOOD_{col} → {e}")

        for col, col_type in hanro_columns_with_types.items():
            try:
                cursor.execute(f"ALTER TABLE p_joined_race_chokyo ADD COLUMN `HANRO_{col}` {col_type}")
            except Exception as e:
                print(f"⚠️ カラム追加失敗: HANRO_{col} → {e}")
        conn.commit()

        # 不要列削除
        df.drop(columns=["RACE_DATE", "CHOKYO_NENGAPPI_DT", "CHOKYO_NENGAPPI_DT_HANRO"], inplace=True, errors="ignore")

        # INSERT対象の列だけを抽出
        all_columns = df.columns.tolist()
        base_cols = [col for col in all_columns if not col.startswith("WOOD_") and not col.startswith("HANRO_")]
        wood_cols = [f"WOOD_{col}" for col in wood_columns_with_types if f"WOOD_{col}" in all_columns]
        hanro_cols = [f"HANRO_{col}" for col in hanro_columns_with_types if f"HANRO_{col}" in all_columns]
        insert_cols = base_cols + wood_cols + hanro_cols

        df = df[insert_cols].copy()
        # 1) ±inf を先に None へ
        df.replace({np.inf: None, -np.inf: None}, inplace=True)
        # 2) datetime64列は Python datetime へ(NaTは後でNone化)
        for c in df.columns:
            if pd.api.types.is_datetime64_any_dtype(df[c]):
                df[c] = df[c].dt.to_pydatetime()
        # 3) object化してから NaN/NaT を None へ
        df = df.astype(object).where(pd.notnull(df), None)
        # 4) Pythonの素の型で渡す(optionalだが堅牢)
        data = [tuple(row) for row in df.itertuples(index=False, name=None)]

        print("💾 一括INSERT中...")
        insert_sql = f"INSERT INTO p_joined_race_chokyo ({','.join(insert_cols)}) VALUES ({','.join(['%s'] * len(insert_cols))})"
        data = df.values.tolist()

        for i in tqdm(range(0, len(data), 1000)):
            with conn.cursor() as cursor:
                cursor.executemany(insert_sql, data[i:i + 1000])
            conn.commit()

    print("✅ 全件完了")

実行用ファイルを作成

これで、データ取得用のコードは完成したので、あとは実行用ファイルを作成するだけです。

init_db.pyをコピペして新しいファイルを作成してちょっと修正するだけです。こちらもコード載せておきますね。

from db.predict_work_tables import create_ranked_race_base, create_ranked_race_temp, create_joined_race_table, enrich_w_joined_race_bulk,extend_w_joined_race_with_kyosoba,transform_w_training_data_for_lgb

if __name__ == "__main__":
    print("🛠️ ranked_race_base を作成中...")
    create_ranked_race_base("2025","1026")

    print("🛠️ ranked_race_temp を作成中...")
    create_ranked_race_temp()

    print("🛠️ joined_race を作成中...")
    create_joined_race_table()

    print("🛠️ joined_race に調教データ追加中...")
    enrich_w_joined_race_bulk()

    print("🛠️ p_training_data に競争馬情報の追加中...")
    extend_w_joined_race_with_kyosoba()

    print("🛠️ p_training_data_lgb に学習用データ追加中...")
    transform_w_training_data_for_lgb()

    print("✅ すべて完了しました。")

あとは実行するだけで、データベースに予想したい日のレース情報が作成されます。

まとめ

この記事では、「予測する日のレース情報を自動取得する」コードを紹介しました。

これまで作成したコードを多少修正するだけで、簡単にデータの抽出が完了します。

次の記事では作成したモデルを使って予測するコードを紹介していきます。

コメント