出走データを学習用に整形するには、ただSQLで一括処理するだけでは不安定です。
ここでは、Pythonでデータを小分けに取得・加工して挿入する関数 create_ranked_race_base()
を紹介し、コード全文からプロジェクト構成までわかりやすく解説します。
出走情報を学習用に抽出する
競馬AIを作るには、まず学習用にきれいなテーブルを用意することが欠かせません。
今回紹介する create_ranked_race_base()
は、2015年以降の中央競馬の出走情報(umagoto_race_joho
)から必要なデータだけを取り出し、学習しやすい形に整形して別テーブル w_ranked_race_base
を作る処理です。
ポイントはSQLだけに処理を任せないこと。
データ量が数百万行規模になるため、一気に INSERT ... SELECT
するとDBが固まったりメモリを食いつぶしたりします。
そこで Pythonでデータを小分けに運び、必要な加工をその場で加えてから挿入する方法を取っています。
プログラム全文
以下が実際のコードです。
(db/init_work_tables.py
に配置する想定)
def create_ranked_race_base():
create_table_sql = """
CREATE TABLE IF NOT EXISTS w_ranked_race_base (
LIKE umagoto_race_joho
);
"""
alter_table_sql = """
ALTER TABLE w_ranked_race_base
ADD COLUMN RACE_ID BIGINT,
ADD COLUMN TANSHO_ODDS_REAL DECIMAL(5,1),
ADD INDEX idx_race_id (RACE_ID);
"""
select_sql_template = """
SELECT *,
CAST(CONCAT(KAISAI_NEN, LPAD(KEIBAJO_CODE,2,'0'), KAISAI_KAIJI, KAISAI_NICHIJI, LPAD(RACE_BANGO,2,'0')) AS UNSIGNED) AS RACE_ID,
CAST(SUBSTRING(TANSHO_ODDS, 1, CHAR_LENGTH(TANSHO_ODDS) - 1) AS UNSIGNED) +
CAST(CONCAT('0.', SUBSTRING(TANSHO_ODDS, -1)) AS DECIMAL(5,1)) AS TANSHO_ODDS_REAL
FROM umagoto_race_joho
WHERE KAISAI_NEN >= 2015
AND KEIBAJO_CODE IN ('01','02','03','04','05','06','07','08','09','10')
LIMIT {limit} OFFSET {offset};
"""
insert_sql = """
INSERT INTO w_ranked_race_base (
INSERT_TIMESTAMP, UPDATE_TIMESTAMP, RECORD_SHUBETSU_ID, DATA_KUBUN, DATA_SAKUSEI_NENGAPPI,
RACE_CODE, KAISAI_NEN, KAISAI_GAPPI, KEIBAJO_CODE, KAISAI_KAIJI, KAISAI_NICHIJI,
RACE_BANGO, WAKUBAN, UMABAN, KETTO_TOROKU_BANGO, BAMEI, UMAKIGO_CODE, SEIBETSU_CODE,
HINSHU_CODE, MOSHOKU_CODE, BAREI, TOZAI_SHOZOKU_CODE, CHOKYOSHI_CODE, CHOKYOSHIMEI_RYAKUSHO,
BANUSHI_CODE, BANUSHIMEI_HOJINKAKU_NASHI, FUKUSHOKU_HYOJI, FUTAN_JURYO, HENKOMAE_FUTAN_JURYO,
BLINKER_SHIYO_KUBUN, KISHU_CODE, HENKOMAE_KISHU_CODE, KISHUMEI_RYAKUSHO,
HENKOMAE_KISHUMEI_RYAKUSHO, KISHU_MINARAI_CODE, HENKOMAE_KISHU_MINARAI_CODE, BATAIJU,
ZOGEN_FUGO, ZOGEN_SA, IJO_KUBUN_CODE, NYUSEN_JUNI, KAKUTEI_CHAKUJUN, DOCHAKU_KUBUN,
DOCHAKU_TOSU, SOHA_TIME, CHAKUSA_CODE1, CHAKUSA_CODE2, CHAKUSA_CODE3, CORNER1_JUNI,
CORNER2_JUNI, CORNER3_JUNI, CORNER4_JUNI, TANSHO_ODDS, TANSHO_NINKIJUN, KAKUTOKU_HONSHOKIN,
KAKUTOKU_FUKASHOKIN, KOHAN_4F, KOHAN_3F, AITE1_KETTO_TOROKU_BANGO, AITE1_BAMEI,
AITE2_KETTO_TOROKU_BANGO, AITE2_BAMEI, AITE3_KETTO_TOROKU_BANGO, AITE3_BAMEI, TIME_SA,
RECORD_KOSHIN_KUBUN, MINING_KUBUN, MINING_YOSO_SOHA_TIME, MINING_YOSO_GOSA_PLUS,
MINING_YOSO_GOSA_MINUS, MINING_YOSO_JUNI, KYAKUSHITSU_HANTEI, RACE_ID, TANSHO_ODDS_REAL
)
VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
);
"""
with get_connection() as conn:
with conn.cursor() as cursor:
print("🧹 w_ranked_race_base を削除中...")
cursor.execute("DROP TABLE IF EXISTS w_ranked_race_base")
print("🧱 テーブル構造作成中...")
cursor.execute(create_table_sql)
cursor.execute(alter_table_sql)
print("📥 分割挿入開始...")
limit = 10000
offset = 0
while True:
select_sql = select_sql_template.format(limit=limit, offset=offset)
cursor.execute(select_sql)
rows = cursor.fetchall()
if not rows:
break
insert_data = [
(
row["INSERT_TIMESTAMP"], row["UPDATE_TIMESTAMP"], row["RECORD_SHUBETSU_ID"], row["DATA_KUBUN"],
row["DATA_SAKUSEI_NENGAPPI"], row["RACE_CODE"], row["KAISAI_NEN"], row["KAISAI_GAPPI"],
row["KEIBAJO_CODE"], row["KAISAI_KAIJI"], row["KAISAI_NICHIJI"], row["RACE_BANGO"],
row["WAKUBAN"], row["UMABAN"], row["KETTO_TOROKU_BANGO"], row["BAMEI"], row["UMAKIGO_CODE"],
row["SEIBETSU_CODE"], row["HINSHU_CODE"], row["MOSHOKU_CODE"], row["BAREI"], row["TOZAI_SHOZOKU_CODE"],
row["CHOKYOSHI_CODE"], row["CHOKYOSHIMEI_RYAKUSHO"], row["BANUSHI_CODE"],
row["BANUSHIMEI_HOJINKAKU_NASHI"], row["FUKUSHOKU_HYOJI"], row["FUTAN_JURYO"],
row["HENKOMAE_FUTAN_JURYO"], row["BLINKER_SHIYO_KUBUN"], row["KISHU_CODE"],
row["HENKOMAE_KISHU_CODE"], row["KISHUMEI_RYAKUSHO"], row["HENKOMAE_KISHUMEI_RYAKUSHO"],
row["KISHU_MINARAI_CODE"], row["HENKOMAE_KISHU_MINARAI_CODE"], row["BATAIJU"],
row["ZOGEN_FUGO"], row["ZOGEN_SA"], row["IJO_KUBUN_CODE"], row["NYUSEN_JUNI"],
row["KAKUTEI_CHAKUJUN"], row["DOCHAKU_KUBUN"], row["DOCHAKU_TOSU"], row["SOHA_TIME"],
row["CHAKUSA_CODE1"], row["CHAKUSA_CODE2"], row["CHAKUSA_CODE3"], row["CORNER1_JUNI"],
row["CORNER2_JUNI"], row["CORNER3_JUNI"], row["CORNER4_JUNI"], row["TANSHO_ODDS"],
row["TANSHO_NINKIJUN"], row["KAKUTOKU_HONSHOKIN"], row["KAKUTOKU_FUKASHOKIN"],
row["KOHAN_4F"], row["KOHAN_3F"], row["AITE1_KETTO_TOROKU_BANGO"], row["AITE1_BAMEI"],
row["AITE2_KETTO_TOROKU_BANGO"], row["AITE2_BAMEI"], row["AITE3_KETTO_TOROKU_BANGO"],
row["AITE3_BAMEI"], row["TIME_SA"], row["RECORD_KOSHIN_KUBUN"], row["MINING_KUBUN"],
row["MINING_YOSO_SOHA_TIME"], row["MINING_YOSO_GOSA_PLUS"], row["MINING_YOSO_GOSA_MINUS"],
row["MINING_YOSO_JUNI"], row["KYAKUSHITSU_HANTEI"],
row["RACE_ID"], row["TANSHO_ODDS_REAL"]
)
for row in rows
]
cursor.executemany(insert_sql, insert_data)
conn.commit()
print(f"✅ {len(rows)} 行挿入 (OFFSET {offset})")
offset += limit
コード解説
テーブル作成と追加列
CREATE TABLE ... LIKE
で元テーブルumagoto_race_joho
と同じ構造をそのままコピー。- さらに
ALTER TABLE
で学習用に必要な列を追加します。- RACE_ID:開催年+競馬場コード+回次+日次+レース番号をまとめた一意のID。
- TANSHO_ODDS_REAL:文字列だった単勝オッズを小数点付きの数値に正規化。
データ取得
LIMIT
とOFFSET
で 1万件ずつ 取り出すのがポイント。CAST
やSUBSTRING
で、レースIDやオッズの型変換はここで完結。
これ以上の集約や結合はせず、SQL側を軽く保っています。
データ挿入
executemany()
で一括挿入し、毎バッチcommit()
。- 進捗を
print()
で出力することで長時間処理でも状況が見えます。
ポイント紹介(設計の肝)
- SQL側に重い処理を背負わせない
巨大なINSERT ... SELECT
は一見スマートでも、TEMP領域やロックで落ちやすい。 - Python側で小分けに運ぶ
失敗時のリトライや途中確認がしやすく、長時間ジョブに向く。 - バッチサイズは調整可能
1万件は目安。環境によって3000〜20000件程度でチューニングすると良いです。 - インデックスのタイミング
まとめて入れる場合は後付けが高速。今回は読み取り前提で先付けしています。
コードの呼び出し方
この関数はプロジェクトの初期化スクリプト init_db.py
から実行します。
複数の作業テーブルをまとめて作成する流れの中で create_ranked_race_base()
が呼ばれます。
※次の記事からも別の関数を作成しますが、init_db.pyに呼び出しを追記していきます。
from db.init_work_tables import (
create_ranked_race_base
)
if __name__ == "__main__":
print("🛠️ ranked_race_base を作成中...")
create_ranked_race_base()
print("✅ すべて完了しました。")
DB接続は db/connection.py
で管理。
例えば以下のように get_connection()
を定義します。
import pymysql
from pymysql.cursors import DictCursor
def get_connection():
return pymysql.connect(
host="localhost",
user="user",
password="passwd",
database="mykeiba",
charset="utf8mb4",
cursorclass=DictCursor,
autocommit=False
)
ターミナルからの実行方法
プロジェクトのルートディレクトリで、以下のようにモジュール指定で実行します。
python -m services.init_db
この -m
オプションは「パッケージとしてモジュールを実行する」ための指定です。services
がパッケージ(__init__.py
があるディレクトリ)として認識され、
その中の init_db.py
がメインスクリプトとして起動されます。
- 実行中は
🛠️ ranked_race_base を作成中...
の進捗メッセージが順に表示されます。 - すべての処理が終わると
✅ すべて完了しました。
が表示されます。
※通常の python init_db.py
ではなく python -m services.init_db
としているのは、
プロジェクトをパッケージ単位で管理し、相対インポートを安全に使うためです。
プロジェクト全体のフォルダ構成
シンプルにまとめると次のようなイメージです。
project-root/
├─ db/
│ ├─ connection.py # get_connection()を定義
│ └─ init_work_tables.py # create_ranked_race_base など複数関数を管理
├─ ml/
│ └─ estimation.py # モデル作成用のコード
├─ services/
│ └─ init_db.py # ← 本記事の主役
├─ model/
│ └─ ... 学習済みモデルや特徴量設計ファイル
└─ logs/
└─ ... 実行ログやエラーログ
db
:DB接続やテーブル作成系の関数をまとめる場所services
:データ加工や特徴量作成などのロジックinit_db.py
:初期化の実行ポイント。cronやシェルから呼び出すのもここ。
まとめ
- 2015年以降の中央競馬出走データを学習向けに整形するテーブルが
w_ranked_race_base
。 - SQLの一括処理では重くなるため、Pythonで小分けに取得・挿入することで安定&高速化。
- 接続ファイル、呼び出しスクリプト、フォルダ構成まで揃えておけば、今後の特徴量設計やモデル学習もスムーズ。
この形をベースに、次は「過去5走の結合」や「特徴量の自動生成」など、さらにAIに寄せた加工へ進めていけます。
大規模データを扱うときの基本パターンとして、ぜひプロジェクトに取り入れてみてください。
コメント