これまで学習済みのLightGBMモデルを使って、実際のレース結果を自動で予測する方法を紹介します。
本記事では、データベースに保存された特徴量を読み込み、予測確率から入賞確率や期待値を計算してDBに書き戻すまでをすべて自動化します。
人気馬ばかり上位になる課題への対策も考慮し、これからレース予測を本格的に運用したい方に最適な内容です。
処理の全体像
今回のプログラムは、これまで作成・学習してきたLightGBMモデルを読み込み、MySQL上のテーブルから最新の特徴量を取得して予測確率を出力する一連の流れを構築しています。
主な処理の流れは以下の通りです。
- モデルの読み込み
- 特徴量データの取得(p_training_data_lgb)
- 予測確率の算出
- メタ情報(馬名・レース会場など)の結合
- オッズデータとのマージ
- 入賞確率・期待値の計算
- 結果をp_predict_resultsテーブルに保存
最終的には、各レースごとに全出走馬の予測順位(PRED_RANK)と期待値(EXPECTED_VALUE)を一覧で出力します。
プログラム全文
import os
import numpy as np
import pandas as pd
import lightgbm as lgb
from db.connection import get_connection
MODEL_PATH  = os.path.join("model", "model.txt")
SRC_TABLE   = "p_training_data_lgb"
META_TABLE  = "p_training_data"     # BAMEI / KEIBAJO_CODE / RACE_BANGO / WAKUBAN / UMABAN / RACE_CODE
META_TABLE2 = "odds1_tansho"        # RACE_CODE / KAISAI_NEN / KAISAI_GAPPI / UMABAN / ODDS / NINKI
OUT_TABLE   = "p_predict_results"
KEIBAJO_MAP = {
    1: "札幌", 2: "函館", 3: "福島", 4: "新潟", 5: "東京",
    6: "中山", 7: "中京", 8: "京都", 9: "阪神", 10: "小倉"
}
def _to_int_code(x):
    """'01'→1, '10'→10, 1→1。変換できなければ None。"""
    if x is None or (isinstance(x, float) and np.isnan(x)): return None
    s = str(x).strip()
    if s == "": return None
    try:
        return int(s.lstrip("0") or "0")
    except Exception:
        return None
def _norm_ketto(s: pd.Series) -> pd.Series:
    """KETTO_TOROKU_BANGO の表記ゆれ吸収(文字列化→trim→先頭ゼロ除去)。空は '0' に。"""
    out = s.astype(str).str.strip().str.lstrip("0")
    return out.mask(out == "", "0")
def _odds_str_to_real(odds_val):
    """
    odds1_tansho.ODDS を SQL 式に準拠して小数へ変換:
      CAST(SUBSTRING(ODDS,1,LEN-1) AS UNSIGNED) + CAST(CONCAT('0.', RIGHT(ODDS,1)) AS DECIMAL(5,1))
    - '123' -> 12.3
    - '7'   -> 0.7
    - すでに '12.3' のような小数表記はそのまま float 変換
    - それ以外/欠損は np.nan
    """
    if odds_val is None or (isinstance(odds_val, float) and np.isnan(odds_val)):
        return np.nan
    s = str(odds_val).strip()
    if s == "":
        return np.nan
    # 小数点表記はそのまま採用
    try:
        if "." in s:
            return float(s)
    except Exception:
        pass
    # 数字のみは SQL の式どおりに変換
    if s.isdigit():
        if len(s) == 1:
            return float(int(s[-1])) / 10.0
        return float(int(s[:-1])) + float(int(s[-1])) / 10.0
    # それ以外は float に挑戦(失敗なら NaN)
    try:
        return float(s)
    except Exception:
        return np.nan
def predict_with_lgbm():
    # 1) モデル読み込み
    if not os.path.exists(MODEL_PATH):
        raise FileNotFoundError(f"モデルが見つかりません: {MODEL_PATH}")
    booster = lgb.Booster(model_file=MODEL_PATH)
    feat_names = booster.feature_name()
    if not feat_names:
        raise RuntimeError("モデルから特徴量名を取得できません。")
    # 2) 予測データ取得(特徴量)
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(f"SELECT * FROM `{SRC_TABLE}`")
            rows = cur.fetchall()
            cols = [d[0] for d in cur.description]
            df = pd.DataFrame(rows, columns=cols)
    # オッズは予測に使わない
    df = df.drop(["TANSHO_ODDS_REAL","TANSHO_NINKIJUN","BATAIJU","ZOGEN_SA"], axis=1)
    # 必須キー
    for k in ["RACE_ID", "KETTO_TOROKU_BANGO"]:
        if k not in df.columns:
            raise KeyError(f"{SRC_TABLE} に必須列 {k} がありません。")
    # 3) 特徴量整形(列合わせ・数値化・NaN埋め)
    for c in (c for c in feat_names if c not in df.columns):
        df[c] = 0
    X = df[feat_names].apply(pd.to_numeric, errors="coerce").fillna(0.0).astype(np.float32)
    # 4) 予測(確率)
    num_iter = booster.best_iteration if getattr(booster, "best_iteration", None) else None
    pred = booster.predict(X, num_iteration=num_iter)
    # 5) 予測DF(キー+確率)
    out = df[["RACE_ID", "KETTO_TOROKU_BANGO"]].copy()
    out["PRED_PROB"] = pred
    # 6) RACE_IDごとに PRED_RANK
    out["PRED_RANK"] = out.groupby("RACE_ID")["PRED_PROB"].rank(ascending=False, method="first").astype(int)
    # 7) メタ(p_training_data)取得&マージ(BAMEI/場/枠/馬/レース番号/RACE_CODE)
    meta_cols = [
        "RACE_ID", "KETTO_TOROKU_BANGO", "BAMEI",
        "KEIBAJO_CODE", "RACE_BANGO", "WAKUBAN", "UMABAN", "RACE_CODE"
    ]
    meta_cols2 = ["RACE_CODE", "KAISAI_NEN", "KAISAI_GAPPI", "UMABAN", "ODDS", "NINKI"]
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(f"SELECT {', '.join('`'+c+'`' for c in meta_cols)} FROM `{META_TABLE}`")
            mrows = cur.fetchall()
            mcols = [d[0] for d in cur.description]
            meta_df = pd.DataFrame(mrows, columns=mcols)
            cur.execute(f"SELECT {', '.join('`'+c+'`' for c in meta_cols2)} FROM `{META_TABLE2}`")
            mrows2 = cur.fetchall()
            mcols2 = [d[0] for d in cur.description]
            meta_df2 = pd.DataFrame(mrows2, columns=mcols2)
    # キー正規化して meta_df をマージ
    out["RACE_ID"] = pd.to_numeric(out["RACE_ID"], errors="coerce").astype("Int64")
    meta_df["RACE_ID"] = pd.to_numeric(meta_df["RACE_ID"], errors="coerce").astype("Int64")
    out["KETTO_TOROKU_BANGO_N"]     = _norm_ketto(out["KETTO_TOROKU_BANGO"])
    meta_df["KETTO_TOROKU_BANGO_N"] = _norm_ketto(meta_df["KETTO_TOROKU_BANGO"])
    meta_df = meta_df.drop_duplicates(subset=["RACE_ID", "KETTO_TOROKU_BANGO_N"], keep="first")
    out = out.merge(
        meta_df.rename(columns={"KETTO_TOROKU_BANGO_N": "KTB_N"})[
            ["RACE_ID", "KTB_N", "BAMEI", "KEIBAJO_CODE", "RACE_BANGO", "WAKUBAN", "UMABAN", "RACE_CODE"]
        ],
        left_on=["RACE_ID", "KETTO_TOROKU_BANGO_N"],
        right_on=["RACE_ID", "KTB_N"],
        how="left"
    ).drop(columns=["KTB_N"])
    # 場名
    out["KAISAI"] = out["KEIBAJO_CODE"].apply(lambda x: KEIBAJO_MAP.get(_to_int_code(x)))
    # 8) odds1_tansho をマージ(RACE_CODE×UMABAN で一意)
    if not meta_df2.empty:
        meta_df2 = meta_df2.copy()
        meta_df2["UMABAN"] = meta_df2["UMABAN"].astype(str).str.strip()
        out["UMABAN"]      = out["UMABAN"].astype(str).str.strip()
        meta_df2 = meta_df2.drop_duplicates(subset=["RACE_CODE", "UMABAN"], keep="first")
        out = out.merge(meta_df2[["RACE_CODE", "UMABAN", "ODDS", "NINKI"]], on=["RACE_CODE", "UMABAN"], how="left")
    else:
        out["ODDS"]  = np.nan
        out["NINKI"] = np.nan
    # 9) ODDS を SQL 式どおりに実数化して使用(TANSHO_ODDS_REAL は使わない)
    out["ODDS_REAL"] = out["ODDS"].apply(_odds_str_to_real)
    # 10) 入賞確率の正規化(RACE_IDごと合計=1.0)→ %化
    sums = out.groupby("RACE_ID")["PRED_PROB"].transform("sum")
    out["NORM_PROB"] = np.where(sums > 0, out["PRED_PROB"] / sums, 0.0) * 100.0  # %
    # 11) オッズ調整式 → 期待値(%)
    odds_real = pd.to_numeric(out["ODDS_REAL"], errors="coerce")
    adjusted_odds = np.where(
        odds_real > 20,
        10 + (odds_real - 20) / (1 + 0.1 * (odds_real - 20)),
        np.where(
            odds_real > 10,
            10 + (odds_real - 10) / 2,
            odds_real
        )
    )
    # EV% = 調整後オッズ × 正規化確率%
    out["EXPECTED_VALUE"] = adjusted_odds * out["NORM_PROB"]
    # 12) 表示体裁:%列は整数へ、人気は整数へ
    out["NINKI"]          = pd.to_numeric(out["NINKI"], errors="coerce").astype("Int64")
    out["NORM_PROB"]      = np.round(out["NORM_PROB"]).astype("Int64")
    out["EXPECTED_VALUE"] = np.round(out["EXPECTED_VALUE"]).astype("Int64")
    # 13) 出力列(RACE_ID↑, PRED_RANK↑ で並べ替え)
    save_cols = [
        "RACE_CODE","KETTO_TOROKU_BANGO","RACE_ID", "KAISAI", "RACE_BANGO", "WAKUBAN", "UMABAN", "BAMEI",
        "NINKI", "ODDS_REAL",              # 使いたい生の値
        "PRED_PROB", "EXPECTED_VALUE", "PRED_RANK", "NORM_PROB"
    ]
    out = out.reindex(columns=save_cols)
    out.sort_values(["RACE_ID", "PRED_RANK", "UMABAN"], ascending=True, inplace=True, kind="mergesort")
    # 14) DB保存(DROP→CREATE→INSERT)
    type_map = {
        "RACE_CODE":        "CHAR(16)",
        "KETTO_TOROKU_BANGO":   "CHAR(10)",
        "RACE_ID":        "BIGINT",
        "KAISAI":         "VARCHAR(16)",
        "RACE_BANGO":     "CHAR(2)",
        "WAKUBAN":        "CHAR(1)",
        "UMABAN":         "CHAR(2)",
        "BAMEI":          "VARCHAR(64)",
        "NINKI":          "INT",
        "ODDS_REAL":      "DECIMAL(3,1)",   # odds1_tansho の元文字列
        "PRED_PROB":      "DECIMAL(12,6)", # 生の予測確率(0〜1)
        "EXPECTED_VALUE": "INT",           # %
        "PRED_RANK":      "INT",
        "NORM_PROB":      "INT",           # %
    }
    # NaN/inf→None(PyMySQL対策)
    out = out.copy()
    out.replace({np.inf: None, -np.inf: None}, inplace=True)
    out = out.astype(object).where(pd.notnull(out), None)
    create_sql   = ", ".join(f"`{c}` {type_map[c]}" for c in save_cols)
    ins_cols     = ", ".join(f"`{c}`" for c in save_cols)
    placeholders = ", ".join(["%s"] * len(save_cols))
    insert_sql   = f"INSERT INTO `{OUT_TABLE}` ({ins_cols}) VALUES ({placeholders})"
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(f"DROP TABLE IF EXISTS `{OUT_TABLE}`")
            cur.execute(f"CREATE TABLE `{OUT_TABLE}` ({create_sql})")
            cur.execute(f"ALTER TABLE `{OUT_TABLE}` ADD INDEX idx_race_rank (`RACE_ID`, `PRED_RANK`)")
        conn.commit()
        rows = [tuple(r) for r in out.itertuples(index=False, name=None)]
        with conn.cursor() as cur:
            cur.executemany(insert_sql, rows)
        conn.commit()
    print(f"✅ 予測完了: {len(out)} 行を `{OUT_TABLE}` に書き込みました。")
if __name__ == "__main__":
    predict_with_lgbm()
###人気馬ばかり上位に予測されてしまう
###俺プロにアップしたい
解説:モデルを使ったレース予測の流れ
① モデルの読み込みと特徴量取得
まず最初に、保存済みのLightGBMモデルをロードします。
ここでは、model/model.txtを指定しています。
モデルには特徴量名も保存されているため、学習時と同じ特徴量構成で推論を行うことが可能です。
続いて、予測対象のデータをデータベース(p_training_data_lgb)から取得します。
このテーブルには、すでに学習データと同じ形式で特徴量が格納されていることが前提です。
cur.execute(f"SELECT * FROM `{SRC_TABLE}`")
df = pd.DataFrame(rows, columns=cols)不要な列(オッズや馬体重など)は予測には使わないため削除します。
df = df.drop(["TANSHO_ODDS_REAL","TANSHO_NINKIJUN","BATAIJU","ZOGEN_SA"], axis=1)② 特徴量の整形と欠損補完
モデルに含まれる全ての特徴量がDataFrameに揃っていない場合は、欠損列をゼロで補完します。
その後、float32型に統一し、NaNもすべて0.0で埋めています。
こうすることで、LightGBM推論時にエラーを起こさない安全な入力形式を作れます。
X = df[feat_names].apply(pd.to_numeric, errors="coerce").fillna(0.0).astype(np.float32)③ モデル推論と確率出力
次にLightGBMで予測確率を出します。
ここで得られるpredは「その馬が上位に入る確率」を示す値(0〜1)です。
pred = booster.predict(X, num_iteration=num_iter)レースごとに確率の高い順に順位(PRED_RANK)を付けます。
これにより、どの馬が上位に来るとモデルが判断しているかがわかります。
④ メタ情報のマージ
続いて、馬名やレース会場などを持つp_training_dataテーブルを結合します。
キーはRACE_IDとKETTO_TOROKU_BANGOです。
このとき、血統登録番号(KETTO_TOROKU_BANGO)の表記ゆれを吸収するため、ゼロ埋め除去と空値補正を行っています。
def _norm_ketto(s: pd.Series) -> pd.Series:
    out = s.astype(str).str.strip().str.lstrip("0")
    return out.mask(out == "", "0")この工程により、同じ馬を確実に紐づけることができます。
さらに、KEIBAJO_CODEから開催場名(札幌・阪神など)を変換しています。
KEIBAJO_MAP = {1: "札幌", 2: "函館", ..., 10: "小倉"}
out["KAISAI"] = out["KEIBAJO_CODE"].apply(lambda x: KEIBAJO_MAP.get(_to_int_code(x)))⑤ オッズ情報との統合
odds1_tanshoテーブルから単勝オッズと人気順を取得し、RACE_CODEとUMABANでマージします。
この段階で、機械学習による予測確率と実際のオッズ情報が1つのテーブルに揃います。
SQLで文字列として扱われていたオッズを、実際の数値(例:’123’→12.3)に変換します。
out["ODDS_REAL"] = out["ODDS"].apply(_odds_str_to_real)⑥ 入賞確率と期待値の算出
ここが今回のスクリプトの核心部分です。
まず、レース内の全馬の確率合計を100%になるように正規化し、その馬が入賞する確率(%)を求めます。
out["NORM_PROB"] = np.where(sums > 0, out["PRED_PROB"] / sums, 0.0) * 100.0次に、オッズが高すぎる馬(例:50倍など)をそのまま掛けると極端な期待値が出てしまうため、補正オッズ(adjusted_odds)を導入しています。
この補正式により、人気薄の馬でも現実的な範囲で期待値が算出できます。
adjusted_odds = np.where(
    odds_real > 20,
    10 + (odds_real - 20) / (1 + 0.1 * (odds_real - 20)),
    np.where(
        odds_real > 10,
        10 + (odds_real - 10) / 2,
        odds_real
    )
)
out["EXPECTED_VALUE"] = adjusted_odds * out["NORM_PROB"]つまり、期待値(%)=調整後オッズ × 入賞確率(%)
この数値が大きいほど、「的中時の見返りが大きく、確率的にも可能性がある」と判断できます。
⑦ 出力整形と保存
最後に、整数化や列の並び替えを行い、結果をデータベースに保存します。
保存先はp_predict_resultsで、毎回テーブルを再作成して上書き保存する仕様です。
cur.execute(f"DROP TABLE IF EXISTS `{OUT_TABLE}`")
cur.execute(f"CREATE TABLE `{OUT_TABLE}` (...省略...)")
cur.executemany(insert_sql, rows)
conn.commit()モデル運用時のポイント
1. 人気馬ばかり上位に出る問題
LightGBMモデルをそのまま使うと、どうしても過去データから人気と着順の相関が強く反映されてしまいます。
これを防ぐには、学習データ側で「オッズ」や「人気順位」を除外しておくこと、または「穴馬モデル」と「人気馬モデル」を分けて学習するのが有効です。
本スクリプトでも、オッズ情報はあくまで結果表示用に使い、予測入力には含めていません。
2. 予測値の正規化の意義
LightGBMが出す確率は、レース内で比較して初めて意味を持ちます。
そのため、各レースで確率を正規化し、合計を100%に調整することで、「その馬が勝つ確率」を直感的に理解できる数値に変換しています。
3. 期待値計算の工夫
オッズが高い馬は、わずかな確率でも期待値が極端に大きくなりやすいため、非線形な補正式を導入。
実際の馬券投資で使う際にも、過剰な穴狙いを避け、安定した回収率を狙える仕組みになっています。
まとめ
本記事では、学習済みLightGBMモデルを使ってレース結果を自動予測する仕組みを解説しました。
データベースから特徴量を取得し、確率を算出、メタ情報と結合して入賞確率と期待値を可視化する一連のパイプラインです。
このスクリプトを実行すれば、毎日のレースデータを最新モデルで自動予測し、即座にベット候補を抽出する土台が完成します。
 
 
コメント