ここまで積み重ねてきた前処理の最後のステップです。不要なカラムを削除し、文字やコードを数値に変換、さらに型を整えて学習に耐えられる形に仕上げる。これを行うことで、LightGBMにそのまま投入できる学習データが完成します。この記事をもって学習データ作成の工程はひと区切り。安心してモデルトレーニングへ進める状態になります。
処理概要
今回の処理では、以下のような前処理をまとめて実行しています。
- 不要カラムの削除
 ログや文字列の説明用カラム、過去の調教師名など、学習に寄与しない列をdrop_colsとしてまとめ、除外します。
 → データがすっきりし、学習に集中できる。
- 着差コードの数値化
 例えば"1__" → 1.0、"_34" → 0.75といった具合に、文字列で表されていた着差を数値に変換。
 → モデルが差の大きさをきちんと理解できる。
- 増減(ZOGEN)の統一処理ZOGEN_FUGO(±)とZOGEN_SAを掛け合わせて符号付きの数値に変換。
 → 「前走より+10kg」「-4kg」といった意味が数値で表現される。
- 型付け(DECIMAL / INT / KEEP)
 オッズや着差は DECIMAL(15,2) に、回数や順位などは INT に変換。ID系の列は保持。
 → 型を揃えることでDB側でも安定して扱える。
- 欠損値処理NaNやInfをすべて None に統一してDBへ挿入。
 → LightGBMが標準的に扱える「欠損」として整理される。
こうして作られるのが、最終的な w_training_data_lgb テーブル。これが学習データの完成形です。
プログラム全文
以下が実際のコードです。
(db/init_work_tables.py に追記する)
drop_cols = [
    "INSERT_TIMESTAMP","UPDATE_TIMESTAMP","RECORD_SHUBETSU_ID","DATA_KUBUN","DATA_SAKUSEI_NENGAPPI",
    "BAMEI","CHOKYOSHIMEI_RYAKUSHO","BANUSHIMEI_HOJINKAKU_NASHI","FUKUSHOKU_HYOJI",
    "HENKOMAE_FUTAN_JURYO","HENKOMAE_KISHU_CODE","KISHUMEI_RYAKUSHO","HENKOMAE_KISHUMEI_RYAKUSHO",
    "HENKOMAE_KISHU_MINARAI_CODE","NYUSEN_JUNI","DOCHAKU_KUBUN","DOCHAKU_TOSU","SOHA_TIME",
    "CHAKUSA_CODE1","CHAKUSA_CODE2","CHAKUSA_CODE3","CORNER1_JUNI","CORNER2_JUNI","CORNER3_JUNI","CORNER4_JUNI",
    "TANSHO_ODDS","KAKUTOKU_HONSHOKIN","KAKUTOKU_FUKASHOKIN","KOHAN_4F","KOHAN_3F",
    "AITE1_KETTO_TOROKU_BANGO","AITE1_BAMEI","AITE2_KETTO_TOROKU_BANGO","AITE2_BAMEI","AITE3_KETTO_TOROKU_BANGO","AITE3_BAMEI",
    "TIME_SA","RECORD_KOSHIN_KUBUN","MINING_KUBUN","MINING_YOSO_SOHA_TIME","MINING_YOSO_GOSA_PLUS","MINING_YOSO_GOSA_MINUS",
    "MINING_YOSO_JUNI","KYAKUSHITSU_HANTEI","WOOD_YOBI",
]
chakusa_map = {
    "_12":0.5,"_34":0.75,"1__":1.0,"112":1.5,"114":1.25,"134":1.75,"2__":2.0,"212":2.5,"3__":3.0,"312":3.5,
    "4__":4.0,"5__":5.0,"6__":6.0,"7__":7.0,"8__":8.0,"9__":9.0,"A__":0.3,"D__":0.0,"H__":0.1,"K__":0.2,"T__":15.0,"Z__":10.0,
    "_14":0.25,"214":2.25,"734":7.75,
}
chakusa_cols = [f"PREV{i}_CHAKUSA_CODE1" for i in range(1, 6)]
def transform_w_training_data_for_lgb(
    src_table: str = "w_training_data",
    dst_table: str = "w_training_data_lgb",
    # ※ drop_cols / chakusa_map / chakusa_cols は引数に渡さず、モジュール変数を使う
    keep_cols: list | None = None,                 # そのまま保持したい列(ID等)
    keep_type_map: dict | None = None,             # keep の型指定
    order_by: str | None = "RACE_ID",
    batch_size: int = 20000,
    insert_batch_size: int = 1000,
    decimal_def: str = "DECIMAL(15,2) NULL",       # オッズ・着差の型
    int_def: str = "INT NULL",                     # それ以外の型
):
    # --- モジュール変数を使う ---
    use_drop_cols    = set(globals().get("drop_cols", []))
    use_chakusa_map  = dict(globals().get("chakusa_map", {}))
    use_chakusa_cols = list(globals().get("chakusa_cols", [f"PREV{i}_CHAKUSA_CODE1" for i in range(1, 6)]))
    # keep のデフォルト(mutableデフォルトは使わない)
    default_keep_cols = {"RACE_ID", "KETTO_TOROKU_BANGO"}
    keep_cols = set(keep_cols) if keep_cols is not None else default_keep_cols
    base_keep_type = {
        "RACE_ID": "BIGINT",
        "KETTO_TOROKU_BANGO": "BIGINT",
        "RACE_CODE": "BIGINT",
        "PREV1_RACE_ID": "BIGINT",
        "PREV2_RACE_ID": "BIGINT",
        "PREV3_RACE_ID": "BIGINT",
        "PREV4_RACE_ID": "BIGINT",
        "PREV5_RACE_ID": "BIGINT",
    }
    keep_type_map = {**base_keep_type, **(keep_type_map or {})}
    # keep_cols が未指定なら既定 + 型マップの全キーを取り込む
    default_keep_cols = {"RACE_ID", "KETTO_TOROKU_BANGO"}
    keep_cols = set(keep_cols) if keep_cols is not None else set()
    keep_cols |= default_keep_cols
    keep_cols |= set(keep_type_map.keys())
    # 元テーブルのカラム
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(f"SELECT * FROM {src_table} LIMIT 1")
            src_columns = [d[0] for d in cur.description]
    # --- DECIMAL 対象(オッズ & 着差コード)---
    # ※ PREV1_TANSHO_ODDS_REAL を含めたい場合は下のリストに追加してください
    decimal_priority_cols = [
        "TANSHO_ODDS_REAL",
        "PREV2_TANSHO_ODDS_REAL",
        "PREV3_TANSHO_ODDS_REAL",
        "PREV4_TANSHO_ODDS_REAL",
        "PREV5_TANSHO_ODDS_REAL",
    ] + use_chakusa_cols
    decimal_present = [c for c in decimal_priority_cols if c in src_columns]
    # ZOGEN のペア検出(現行+PREV系を自動)
    zogen_pairs = []
    if "ZOGEN_FUGO" in src_columns and "ZOGEN_SA" in src_columns:
        zogen_pairs.append(("ZOGEN_FUGO", "ZOGEN_SA"))
    for col in src_columns:
        if col.endswith("_ZOGEN_FUGO"):
            prefix = col[: -len("_ZOGEN_FUGO")]
            sa_col = f"{prefix}_ZOGEN_SA"
            if sa_col in src_columns:
                zogen_pairs.append((col, sa_col))
    fugo_cols = {f for f, _ in zogen_pairs}  # 出力から除外
    # 出力列(drop と FUGO を除外)
    selected_cols = [c for c in src_columns if c not in use_drop_cols and c not in fugo_cols]
    # タイプ別のターゲット集合
    decimal_target_cols = {c for c in selected_cols if c in decimal_present and c not in keep_cols}
    int_target_cols     = {c for c in selected_cols if c not in keep_cols and c not in decimal_target_cols}
    # --- 新テーブルのスキーマ作成 ---
    col_defs = []
    for c in selected_cols:
        if c in keep_cols:
            col_type = keep_type_map.get(c, "VARCHAR(64)")
        elif c in decimal_target_cols:
            col_type = decimal_def
        else:
            col_type = int_def
        col_defs.append(f"`{c}` {col_type}")
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(f"DROP TABLE IF EXISTS {dst_table}")
            cur.execute(f"CREATE TABLE {dst_table} ({', '.join(col_defs)})")
            for k in keep_cols:
                try:
                    cur.execute(f"CREATE INDEX idx_{dst_table}_{k.lower()} ON {dst_table}(`{k}`)")
                except:
                    pass
        conn.commit()
    # 総件数
    with get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute(f"SELECT COUNT(*) AS cnt FROM {src_table}")
            total = cur.fetchone()["cnt"]
    print(f"📊 {src_table} → {dst_table}: {total} 行を変換")
    insert_sql = f"""
        INSERT INTO {dst_table} ({', '.join([f'`{c}`' for c in selected_cols])})
        VALUES ({', '.join(['%s'] * len(selected_cols))})
    """
    # --- チャンク処理 ---
    for offset in tqdm(range(0, total, batch_size)):
        # 取得
        with get_connection() as conn:
            with conn.cursor() as cur:
                if order_by and order_by in src_columns:
                    cur.execute(
                        f"SELECT * FROM {src_table} ORDER BY `{order_by}` ASC LIMIT %s OFFSET %s",
                        (batch_size, offset),
                    )
                else:
                    cur.execute(f"SELECT * FROM {src_table} LIMIT %s OFFSET %s", (batch_size, offset))
                rows = cur.fetchall()
        if not rows:
            break
        df = pd.DataFrame(rows, columns=src_columns)
        # --- ZOGEN の符号反映(+ → そのまま, - → マイナス, 空 → 0) ---
        for fugo_col, sa_col in zogen_pairs:
            sign = (
                df[fugo_col].astype("object").fillna("").apply(lambda x: str(x).strip())
                .map({"+": 1, "-": -1, "+": 1, "-": -1})
                .fillna(0).astype("float64")
            )
            sa = pd.to_numeric(df[sa_col], errors="coerce").fillna(0.0)
            out_sa = sa * sign
            out_sa[sign == 0] = 0.0  # 空なら0固定
            df[sa_col] = out_sa
        # --- 着差コードの CASE マップを複数列に適用 ---
        for c in use_chakusa_cols:
            if c in df.columns:
                df[c] = df[c].map(lambda v: use_chakusa_map.get(str(v).strip(), None) if v is not None else None)
        # --- 型変換 ---
        # DECIMAL 対象
        for c in decimal_target_cols:
            if c in df.columns:
                df[c] = pd.to_numeric(df[c], errors="coerce")
        # INT 対象(小数が来ても四捨五入して整数化。必要なら .floor() 等に変更)
        for c in int_target_cols:
            if c in df.columns:
                df[c] = pd.to_numeric(df[c], errors="coerce").round().astype("Int64")
        # --- NaN/Inf/<NA> を必ず None に置換してから INSERT ---
        out = df[selected_cols].copy().astype(object)
        out.replace({np.nan: None, np.inf: None, -np.inf: None, pd.NA: None}, inplace=True)
        values = out.values.tolist()
        # INSERT
        with get_connection() as conn:
            with conn.cursor() as cur:
                for i in range(0, len(values), insert_batch_size):
                    cur.executemany(insert_sql, values[i : i + insert_batch_size])
            conn.commit()
    print("✅ 変換・書き込み完了")解説
不要カラムを一気に落とす
まず、drop_cols に列挙したカラムを対象から外します。学習に使わない情報は徹底的に除外するのがポイントです。無駄なカラムを抱えていると、精度の悪化や学習速度の低下を招きます。
着差コードを数値化
chakusa_map という辞書を使って、文字列を数値に変換しています。例えば "_12" は「0.5馬身差」を意味します。数値に変えることで、差の大小がモデルに正しく伝わるようになります。
増減の正負を反映
馬体重の増減を示す ZOGEN_FUGO と数値の ZOGEN_SA を組み合わせることで、「+10kg」「-6kg」のように実際の増減を数値化します。これにより、前走との比較が素直に特徴量化されます。
型の再定義
各列に対して適切な型を指定し直しています。
- オッズや着差 → DECIMAL
- 順位や人気などの整数値 → INT
- IDやキー列 → BIGINT(保持)
型を揃えておくことは、学習だけでなく後続のSQL処理でも安定性につながる重要なポイントです。
チャンク処理で効率よくINSERT
大規模データを一気にINSERTすると処理落ちするため、2万件ずつ読み出して1000件ごとに分割INSERTしています。これにより、安定して全行を変換・保存できます。
ポイント
- 不要カラムは早めに削除して、データを軽く保つ
- 文字列や記号は必ず数値化し、モデルが理解できる形にする
- 型を正しく揃えることが精度と再現性の土台になる
- 欠損や例外値の扱いを統一し、エラーを避ける
- 大規模データは小分けで処理することで安定性を確保
コードの呼び出し方
この関数は前回のコードと同じくinit_db.py から実行します。
複数の作業テーブルをまとめて作成する流れの中で transform_w_training_data_for_lgb() が呼ばれます。以下のようなコードになります。
from db.init_work_tables import (
    create_ranked_race_base,
    create_ranked_race_temp,
    create_joined_race_table,
    enrich_w_joined_race_bulk,
    extend_w_joined_race_with_kyosoba,
    transform_w_training_data_for_lgb
)
if __name__ == "__main__":
    print("🛠️ ranked_race_base を作成中...")
    create_ranked_race_base()
    print("🛠️ ranked_race_temp を作成中...")
    create_ranked_race_temp()
    print("🛠️ joined_race を作成中...")
    create_joined_race_table()
    print("🛠️ joined_race に調教データ追加中...")
    enrich_w_joined_race_bulk()
    print("🛠️ w_training_data に成績データ追加中...")
    extend_w_joined_race_with_kyosoba()
    print("🛠️ w_training_data_lgb のデータを整理中...")
    transform_w_training_data_for_lgb()
    print("✅ すべて完了しました。")まとめ
今回の処理を経て、LightGBMにそのまま投入できる学習データが完成しました。ここまでで、出走データの整形・過去走の付与・調教情報の結合・馬マスターの追加、そして今回の「不要列削除と数値化」を行い、学習用テーブルが整いました。
これで「学習データの作成」は完了です。以降は、このテーブルを用いてモデルトレーニングや特徴量の検討に進むことができます。
 
 

コメント