競馬AIを構築するとき、出走結果や調教情報だけでは不十分なことがあります。馬ごとに持っている総合成績や脚質の傾向なども特徴量として組み込んだ方が、学習データとしての完成度が高まります。今回紹介する処理は、既存のレースデータに馬のマスタ情報を追加する流れを、SQLではなくPythonで実現したものです。大量データでも安定して動かせるよう工夫されています。
処理概要
この処理では、レース結果のテーブル(w_joined_race_chokyo)に対して、馬ごとの特徴データ(kyosoba_master2)を結合しています。
追加するカラムは以下のとおりです。
- 総合成績: SOGO_1CHAKU, SOGO_2CHAKU, SOGO_3CHAKU, SOGO_4CHAKU, SOGO_5CHAKU, SOGO_CHAKUGAI
- 脚質傾向: KYAKUSHITSU_KEIKO_NIGE, KYAKUSHITSU_KEIKO_SENKO, KYAKUSHITSU_KEIKO_SASHI, KYAKUSHITSU_KEIKO_OIKOMI
本来であれば SQL JOIN を使うところですが、DB 側に負荷がかかりやすいため、あえて Python 側で結合処理を実行しています。これにより、数百万件規模のデータでもタイムアウトせずに処理が進むようになります。
プログラム全文
以下が実際のコードです。
(db/init_work_tables.py
に追記する)
def extend_w_joined_race_with_kyosoba(
src_wc_table: str = "w_joined_race_chokyo",
src_km_table: str = "kyosoba_master2",
dst_table: str = "w_training_data",
batch_size: int = 10000,
insert_batch_size: int = 1000
):
"""
w_joined_race_chokyo (wc) に kyosoba_master2 (km) の列を Python 側で結合し、
新テーブルへ一括挿入する。DB 側の重い JOIN を避ける。
追加列:
- SOGO_1CHAKU, SOGO_2CHAKU, SOGO_3CHAKU, SOGO_4CHAKU, SOGO_5CHAKU, SOGO_CHAKUGAI
- KYAKUSHITSU_KEIKO_NIGE, KYAKUSHITSU_KEIKO_SENKO, KYAKUSHITSU_KEIKO_SASHI, KYAKUSHITSU_KEIKO_OIKOMI
"""
extra_cols = [
"SOGO_1CHAKU","SOGO_2CHAKU","SOGO_3CHAKU","SOGO_4CHAKU","SOGO_5CHAKU","SOGO_CHAKUGAI",
"KYAKUSHITSU_KEIKO_NIGE","KYAKUSHITSU_KEIKO_SENKO","KYAKUSHITSU_KEIKO_SASHI","KYAKUSHITSU_KEIKO_OIKOMI"
]
# 1) km を一括ロード → dict 化(KETTO_TOROKU_BANGO で参照)
print("📥 kyosoba_master2 をロード中...")
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT KETTO_TOROKU_BANGO, {', '.join(extra_cols)}
FROM {src_km_table}
""")
km_rows = cursor.fetchall()
km_columns = [desc[0] for desc in cursor.description]
print(f"✅ kyosoba_master2: {len(km_rows)} 件")
km_map = {}
for r in km_rows:
key = r["KETTO_TOROKU_BANGO"]
km_map[key] = {col: r[col] for col in km_columns if col != "KETTO_TOROKU_BANGO"}
# 2) 出力テーブル作成(既存と同様の手順:LIKE + ALTER)
print(f"🧱 新テーブル `{dst_table}` を作成中...")
# 型は INFORMATION_SCHEMA を読まず、よくある INT/CHAR で固定(必要に応じて調整)
extra_col_types = {
"SOGO_1CHAKU":"INT NULL","SOGO_2CHAKU":"INT NULL","SOGO_3CHAKU":"INT NULL",
"SOGO_4CHAKU":"INT NULL","SOGO_5CHAKU":"INT NULL","SOGO_CHAKUGAI":"INT NULL",
"KYAKUSHITSU_KEIKO_NIGE":"INT NULL","KYAKUSHITSU_KEIKO_SENKO":"INT NULL",
"KYAKUSHITSU_KEIKO_SASHI":"INT NULL","KYAKUSHITSU_KEIKO_OIKOMI":"INT NULL",
}
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"DROP TABLE IF EXISTS {dst_table}")
cursor.execute(f"CREATE TABLE {dst_table} LIKE {src_wc_table}")
alters = [f"ADD COLUMN `{c}` {extra_col_types[c]}" for c in extra_cols]
cursor.execute(f"ALTER TABLE {dst_table} {', '.join(alters)}")
# 参照・並び替えに使うキーへインデックス(存在すればスキップ)
try:
cursor.execute(f"CREATE INDEX idx_{dst_table.lower()}_ketto ON {dst_table}(KETTO_TOROKU_BANGO)")
except Exception:
pass
conn.commit()
print("✅ 新テーブル作成完了")
# 3) 元テーブルの総件数・カラム順を取得(既存のやり方に合わせる)
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) AS cnt FROM {src_wc_table}")
total = cursor.fetchone()["cnt"]
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {src_wc_table} LIMIT 1")
wc_columns = [desc[0] for desc in cursor.description]
print(f"📊 {src_wc_table} 全件数: {total}")
insert_columns = wc_columns + extra_cols
insert_sql = f"""
INSERT INTO {dst_table} ({', '.join(insert_columns)})
VALUES ({', '.join(['%s'] * len(insert_columns))})
"""
# 4) バッチで読み → Python で結合 → 小分け挿入(既存フローと同じ思想)
total_batches = ceil(total / batch_size)
processed = 0
for b in range(total_batches):
offset = b * batch_size
print(f"🚚 バッチ {b+1}/{total_batches} 取得中... (OFFSET {offset})")
with get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"""
SELECT * FROM {src_wc_table}
ORDER BY RACE_ID DESC
LIMIT %s OFFSET %s
""", (batch_size, offset))
chunk_rows = cursor.fetchall()
if not chunk_rows:
break
# Python で LEFT JOIN 相当を行う
data_to_insert = []
for row in chunk_rows:
ketto = row["KETTO_TOROKU_BANGO"]
extras = km_map.get(ketto, {})
wc_part = [row[col] for col in wc_columns]
extra_part = [extras.get(col, None) for col in extra_cols]
data_to_insert.append(tuple(wc_part + extra_part))
print(f"📦 挿入中... {len(data_to_insert)} 件")
with get_connection() as conn:
with conn.cursor() as cursor:
for i in range(0, len(data_to_insert), insert_batch_size):
cursor.executemany(insert_sql, data_to_insert[i:i+insert_batch_size])
conn.commit()
processed += len(data_to_insert)
print(f"✅ 進捗: {processed}/{total} 件完了")
print("🎉 extend_w_joined_race_with_kyosoba 完了")
解説
処理の流れは大きく分けて4段階です。
- 馬のマスタデータをロード
kyosoba_master2
から全件を取得し、血統登録番号ごとの辞書(km_map) に変換。これで後から素早く参照できます。 - 新しいテーブルを作成
元のw_joined_race_chokyo
と同じ構造で新しいテーブルを作り、総合成績や脚質傾向のカラムを追加します。 - 元データのカラム順を保持
学習データとして使いやすいよう、元のカラム + 追加カラムの順番で整えます。 - PythonでJOINして挿入
データを1万件ずつ読み込み、PythonでLEFT JOIN相当の処理を実行。その後、1000件単位で分割して挿入し、DBの負荷を分散させています。
ポイント
- DB側JOINを避ける設計
数百万件規模をJOINすると処理が止まりやすいため、Pythonで分割処理。 - 辞書化による高速参照
km_mapを作っておくことで、血統番号から馬の情報を即座に取得可能。 - インデックス付与で安定化
KETTO_TOROKU_BANGOにインデックスを付けることで、参照系処理も高速化。 - バッチ処理で安定性確保
1万件の取得と1000件単位の挿入で、時間とメモリのバランスを最適化。
コードの呼び出し方
この関数は前回のコードと同じくinit_db.py
から実行します。
複数の作業テーブルをまとめて作成する流れの中で extend_w_joined_race_with_kyosoba()
が呼ばれます。以下のようなコードになります。
from db.init_work_tables import (
create_ranked_race_base,
create_ranked_race_temp,
create_joined_race_table,
enrich_w_joined_race_bulk,
extend_w_joined_race_with_kyosoba
)
if __name__ == "__main__":
print("🛠️ ranked_race_base を作成中...")
create_ranked_race_base()
print("🛠️ ranked_race_temp を作成中...")
create_ranked_race_temp()
print("🛠️ joined_race を作成中...")
create_joined_race_table()
print("🛠️ joined_race に調教データ追加中...")
enrich_w_joined_race_bulk()
print("🛠️ w_training_data に成績データ追加中...")
extend_w_joined_race_with_kyosoba()
print("✅ すべて完了しました。")
まとめ
今回の処理は、学習用データに馬ごとのマスタ情報を加えるための拡張ステップです。
- 総合成績や脚質傾向を追加して特徴量を強化
- JOINをPythonで処理して大規模データでも安定
- バッチ処理とインデックスで効率化
これにより、「馬自身の特徴」も学習に活かせるデータセットが完成します。レース結果や調教情報と組み合わせることで、さらに予測精度を高められるでしょう。
コメント