【競馬AI 14】その日のレースが荒れるかどうかを予測してみる

競馬AI

夏競馬は良く荒れるって聞きますが、本当に荒れていましたね!皆さんは夏競馬の成績はいかがでしたでしょうか?

去年の私は荒れたレースで上位人気を買っていたり、固いレースなのに穴馬を買っていたり、ちぐはぐな馬券を買っていた記憶があります笑

でも自分では荒れるか荒れないか判断できないので、AIの力で推測できないか?と思い今回のモデルを作成してみました。

まずはモデルの検証結果から見てみましょう!

モデルの精度

検証期間:2024/7/20~2024/9/1
対象レース:当日の荒れ指数TOP3(計42レース)

3連単平均配当¥154,706
3連単最高配当¥1,304,170
3連単最低配当¥2,450
万馬券回数36/42(85.7%)
10万馬券回数13/42(31.0%)
荒れ指数平均0.385

3連単の平均払戻金が138,905.3円ほどらしいので、平均よりは荒れるレースが抽出できているようです。

万馬券率や10万馬券率もだいたい77%と17%くらいらしいので、それよりは良い傾向が出ていますね!

データ作成

では早速荒れるレース予想のプログラムを作っていきます。

準備:必要なデータ

  1. エンコーディング済みの過去レースデータ
  2. 払い戻しデータ

それぞれ下記の記事を参考に作成してください。

プログラムコード

以下のコードを使用し、払い戻し金額が500,000円以上だったレースにチェックを付けていきます。

※毎度のことですが、ファイル名、ファイルパスなどはご自身の環境に合わせて修正してください

import pandas as pd

# 年度のリストを生成
years = range(2014, 2024)
# 各年度のCSVファイルを読み込み、一つのデータフレームに結合
df = []
for year in years:
    path = f"payback/{year}.csv"
    data = pd.read_csv(path, encoding="SHIFT-JIS", header=None)
    df.append(data)

betting_data = pd.concat(df, ignore_index=True)

# 条件に合うレースIDを格納するリスト
qualifying_race_ids = []
# データフレームを行ごとにループ処理
for index, row in betting_data.iterrows():
    # レース結果の文字列を評価してリストに変換
    results = eval(row[1])

    # 3連単の結果を取得(最後の要素)し、金額部分を抽出してカンマを削除
    sanrentan = results[-1][-1]  # 最後の要素の金額部分を取得
    amount = int(sanrentan.replace(',', ''))  # 金額を整数に変換

    # 金額が50万を超えているか判定
    if amount > 500000:
        qualifying_race_ids.append(row[0])  # 条件を満たすレースIDを追加

encoded_data = pd.read_csv('encoded/encoded_data.csv')

# race_idごとにデータを集約
def aggregate_race_data(group):
    race_data = {'race_id': group['race_id'].iloc[0]}
    for i, (_, row) in enumerate(group.iterrows(), 1):
        for col in group.columns:
            if col != 'race_id':
                race_data[f'{col}_{i}'] = row[col]
    return pd.DataFrame([race_data])

grouped_data = encoded_data.groupby('race_id').apply(aggregate_race_data).reset_index(drop=True)

# 'is_upset'カラムを追加
is_upset_df = pd.DataFrame({'race_id': qualifying_race_ids, 'is_upset': 1})
grouped_data = grouped_data.merge(is_upset_df, on='race_id', how='left').fillna({'is_upset': 0})

# 出力
grouped_data.to_csv('encoded/encoded_data_upset_grouped.csv', index=False)

print(grouped_data.head())

モデルの作成

以下のコードで、荒れるレースを予測するモデルを作成します。着順予想と異なるのは、着順予想では”着順”を目的変数として、3着以内に入る馬を予測していました。

今回は”is_upset”を目的変数として、荒れるかを予測します。ほぼ順位予想のコードと変わりません。

プログラムコード

import lightgbm as lgb
import pandas as pd
from sklearn.metrics import roc_auc_score
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from imblearn.over_sampling import SMOTE
import joblib

def split_date(df, test_size):
    sorted_id_list = df.sort_values('race_id').index.unique()
    train_id_list = sorted_id_list[:round(len(sorted_id_list) * (1-test_size))]
    test_id_list = sorted_id_list[round(len(sorted_id_list) * (1-test_size)):]
    train = df.loc[train_id_list]
    test = df.loc[test_id_list]
    return train, test

def generate_drop_list(base_cols, num_horses):
    drop_list = []
    for col in base_cols:
        drop_list.extend([f"{col}_{i}" for i in range(1, num_horses + 1)])
    return drop_list

# データの読み込みと前処理
data = pd.read_csv('encoded/encoded_data_20240707_upset_grouped.csv')

# ターゲットをis_upsetに変更
data['is_upset'] = data['is_upset'].astype(int)
train, test = split_date(data, 0.2)

# ベースカラム名と馬の数(最大の数)を指定
base_cols = ['騎手', 'レース名', '開催']
num_horses = max([int(col.split('_')[-1]) for col in data.columns if '_' in col and col.split('_')[-1].isdigit()])

# 動的にdrop_arrを作成
drop_arr = ['is_upset'] + generate_drop_list(base_cols, num_horses)

# 特徴量とターゲットの分割
X_train = train.drop(drop_arr, axis=1)
y_train = train['is_upset']
X_test = test.drop(drop_arr, axis=1)
y_test = test['is_upset']

# NaN値を補完する
imputer = SimpleImputer(strategy='mean')
X_train = imputer.fit_transform(X_train)
X_test = imputer.transform(X_test)

# クラス不均衡の修正
smote = SMOTE(random_state=100)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

# LightGBMの一般的なパラメータ
params = {
    'objective': 'binary',
    'metric': 'binary_logloss',
    'verbosity': -1,
    'boosting_type': 'gbdt',
    'random_state': 100,
    'num_leaves': 31,
    'learning_rate': 0.05,
    'n_estimators': 100
}

# LightGBMモデルのトレーニング
lgb_clf = lgb.LGBMClassifier(**params)
lgb_clf.fit(X_train_resampled, y_train_resampled)

# 予測
y_pred_lgb = lgb_clf.predict_proba(X_test)[:,1]

# LightGBMモデルの保存
joblib.dump(lgb_clf, 'model/model_upset_lgb.pkl')

# AUCスコアの表示
print("LightGBM AUC Score:", roc_auc_score(y_test, y_pred_lgb))

荒れるかどうかを予測する

モデルの作成が出来たので、最後に荒れるかどうかを予測していきます。

準備:予測対象のレースを取得する

予測するためにはレースのデータが必要になります。

取得の方法はこちらの記事に記載されているので参考にしてください。

プログラムコード

上記で取得したレースデータにモデルを使って予測するコードになります。

※個人用に作ったので無駄なコードが消し切れていないかもしれませんが気にしないでください。
また、特徴量はご自身のモデルのものと完全に一致させないと動きません。
恐らく追切情報血統情報などは本ブログでは特徴量に追加していないはずなので、削除して使ってください。

import pandas as pd
import lightgbm as lgb
from sklearn.preprocessing import LabelEncoder
import os
import numpy as np
import pickle
import bisect
import joblib
from sklearn.impute import SimpleImputer
import warnings

# UserWarningを無視
warnings.filterwarnings("ignore", category=UserWarning)
yearStart = 2005

# フォルダのパス
dir_path = "race_data"
# フォルダ内のすべてのファイルとディレクトリを取得
all_items = os.listdir(dir_path)
# フォルダを除外して、ファイルのみのリストを作成
file_list = [item for item in all_items if os.path.isfile(os.path.join(dir_path, item))]
# 一件だけ予想したい場合
# file_list = ['race_data_kokura_11R_20240714.csv']

# is_upsetモデルの読み込み
is_upset_model_file = 'model/model_upset_lgb.pkl'
is_upset_clf = joblib.load(is_upset_model_file)
# 必要なカラム数を取得
required_num_features = len(is_upset_clf.booster_.feature_name())

def aggregate_race_data(group):
    race_data = {'race_id': group['race_id'].iloc[0]}
    for i, (_, row) in enumerate(group.iterrows(), 1):
        for col in group.columns:
            if col != 'race_id':
                race_data[f'{col}_{i}'] = row[col]
    return pd.DataFrame([race_data])

for path in file_list:
    # 予測を行う新しいデータの読み込み
    new_data = pd.read_csv('race_data/' + path)
    race_id = new_data["race_id"]
    date = new_data["日付"]
    place = new_data["場名"]
    
    uma = new_data["馬"]

    # 日付時刻型への変換を試み、無効な形式であればNaNにする
    new_data['日付'] = pd.to_datetime(new_data['日付'], errors='coerce')
    new_data['日付1'] = pd.to_datetime(new_data['日付1'], errors='coerce')
    new_data['日付2'] = pd.to_datetime(new_data['日付2'], errors='coerce')
    new_data['日付3'] = pd.to_datetime(new_data['日付3'], errors='coerce')
    new_data['日付4'] = pd.to_datetime(new_data['日付4'], errors='coerce')
    new_data['日付5'] = pd.to_datetime(new_data['日付5'], errors='coerce')
    # 日付カラムから年、月、日を抽出
    new_data['year'] = new_data['日付'].dt.year
    new_data['month'] = new_data['日付'].dt.month
    new_data['day'] = new_data['日付'].dt.day
    # (年-yearStart)*365 + 月*30 + 日 を計算し新たな '日付'カラムを作成
    new_data['日付'] = (new_data['year'] - yearStart) * 365 + new_data['month'] * 30 + new_data['day']

    new_data['year'] = new_data['日付1'].dt.year
    new_data['month'] = new_data['日付1'].dt.month
    new_data['day'] = new_data['日付1'].dt.day
    new_data['日付1'] = (new_data['year'] - yearStart) * 365 + new_data['month'] * 30 + new_data['day']

    new_data['year'] = new_data['日付2'].dt.year
    new_data['month'] = new_data['日付2'].dt.month
    new_data['day'] = new_data['日付2'].dt.day
    new_data['日付2'] = (new_data['year'] - yearStart) * 365 + new_data['month'] * 30 + new_data['day']

    new_data['year'] = new_data['日付3'].dt.year
    new_data['month'] = new_data['日付3'].dt.month
    new_data['day'] = new_data['日付3'].dt.day
    new_data['日付3'] = (new_data['year'] - yearStart) * 365 + new_data['month'] * 30 + new_data['day']

    new_data['year'] = new_data['日付4'].dt.year
    new_data['month'] = new_data['日付4'].dt.month
    new_data['day'] = new_data['日付4'].dt.day
    new_data['日付4'] = (new_data['year'] - yearStart) * 365 + new_data['month'] * 30 + new_data['day']

    new_data['year'] = new_data['日付5'].dt.year
    new_data['month'] = new_data['日付5'].dt.month
    new_data['day'] = new_data['日付5'].dt.day
    new_data['日付5'] = (new_data['year'] - yearStart) * 365 + new_data['month'] * 30 + new_data['day']

    new_data['最終追切_日付'] = pd.to_datetime(new_data['最終追切_日付'], errors='coerce')
    new_data['year'] = new_data['最終追切_日付'].dt.year
    new_data['month'] = new_data['最終追切_日付'].dt.month
    new_data['day'] = new_data['最終追切_日付'].dt.day
    new_data['最終追切_日付'] = (new_data['year'] - yearStart) * 365 + new_data['month'] * 30 + new_data['day']

    # 不要となった 'year', 'month', 'day' カラムを削除
    new_data.drop(['year', 'month', 'day'], axis=1, inplace=True)

    # カテゴリカル変数のエンコーディング
    categorical_features = ['馬', '場名', '騎手1', '騎手2', '騎手3', '騎手4', '騎手5', '父', '母', '母父', '父父', '最終追切_コース', '最終追切_脚色', '最終追切_評価']  # カテゴリカル変数の列名を指定してください
    for i, feature in enumerate(categorical_features):
        le = LabelEncoder()
        new_data[feature] = le.fit_transform(new_data[feature])

    new_data.fillna(0, inplace=True)

    new_data['オッズ'] = pd.to_numeric(new_data['オッズ'], errors='coerce').fillna(0)
    new_data['人気'] = pd.to_numeric(new_data['人気'], errors='coerce').fillna(0)

    #荒れ予想用
    upset_data = new_data.copy()

    # 着順列を除外 (この列が存在する場合)
    drop_arr = ['race_id', '着順', '上がり', '走破時間', '通過順', '騎手', 'レース名', '開催', '馬の平均着順', '馬の3着内率', '最終追切_評価']
    new_data = new_data.drop(drop_arr, axis=1)

    # 'race_id'の最後の2文字を取得
    new_data.insert(0, 'race_id', race_id)

    # is_upsetの予測
    # race_dataを1行にまとめる
    upset_data = upset_data.drop(['騎手', 'レース名', '開催'], axis=1)

    race_data_agg = upset_data.groupby('race_id').apply(aggregate_race_data).reset_index(drop=True)

    # 必要なカラム数を調整
    current_num_features = race_data_agg.shape[1]
    if current_num_features < required_num_features:
        missing_features = required_num_features - current_num_features
        missing_cols = pd.DataFrame(0, index=race_data_agg.index, columns=[f'missing_{i}' for i in range(missing_features)])
        race_data_agg = pd.concat([race_data_agg, missing_cols], axis=1)

    # is_upset予測
    y_upset_pred = is_upset_clf.predict_proba(race_data_agg)[:, 1]

    print(str(new_data['race_id'].iloc[0])+':'+str(y_upset_pred[0]))

# 最後に全ての予測が完了したことを通知
print("All predictions completed.")

まとめ

モデルの作成と予測はできましたでしょうか?もしうまく動かなければコメント欄でご連絡ください。わかる範囲で回答します。

私の方で使用した感想としては、”確かに荒れるレースは予測できているな”とは思いました。

ただこの値を鵜呑みに出来るほどの精度は出ていないので、馬券を組み立てる時の参考程度でしょうか。

荒れなさそうな指数の場合は穴馬の購入を控えたり、荒れそうな場合は1番人気の頭固定をやめたりなど。

このモデルを使って自分なりの馬券に役立てていただければ幸いです。

コメント

  1. kevin より:

    お世話になっております。
    モデルを使って予測するコードを実行したところ、

    Traceback (most recent call last):
    File “○○/predict_upset.py”, line 93, in
    with open(‘../config/label_encoders.pkl’, ‘rb’) as f:
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    FileNotFoundError: [Errno 2] No such file or directory: ‘../config/label_encoders.pkl’

    と、エラーが発生しました。
    このエラーメッセージによると、label_encoders.pklというファイルが指定されたパスに存在しないようです。このファイルがどのタイミングで生成されるのか、または生成するために実行する必要があるスクリプトや手順について教えていただけますでしょうか?

    もし、関連するドキュメントや追加の手順がある場合は、それらの情報もご提供いただけると幸いです。

    • agus agus より:

      label_encoders.pklを作成するコードは公開しておりませんでした。
      プログラムを修正したので再度確かめてみてください。

      • kevin より:

        お世話になっております。

        修正いただいたプログラムで正常に作動することを確認できました。迅速な対応に感謝いたします。

        ありがとうございました。