【競馬AI 15】予想するレースデータの作成を簡単にする方法を紹介(対策済み)

競馬AI

こちらの記事のアップデートバージョンになります。

クローラー対策がされ、スクレイピングに制限が掛かりました。その対応策としてSeleniumという技術があります。

SeleniumはバックグラウンドでWebのデータを取得するスクレイピングと違い、実際にWebページを表示しデータを取得する技術になります。

自分でページを開くのと同様に、自動でページを動かしているだけなので対策には引っかからないということですね。

対策が掛かったということは自動でのデータ取得をサイト側が良しとしていないので、ご利用は自己責任でお願いします。

私はデータの情報量・正確性・リアルタイム性などを加味して、データベースの購入を検討しています。

修正プログラム

以前のコードと比較し、変更箇所の更新をしてみてください。

プログラム

from bs4 import BeautifulSoup
import requests
from datetime import datetime
import numpy as np
import csv
import sys
import re
import statistics
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

def class_mapping(row):
    mappings = {'障害':0, 'G1': 10, 'G2': 9, 'G3': 8, '(L)': 7, 'オープン': 7, 'OP': 7, '3勝': 6, '1600': 6, '2勝': 5, '1000': 5, '1勝': 4, '500': 4, '新馬': 3, '未勝利': 1}
    for key, value in mappings.items():
        if key in row:
            return value
    return 0  # If no mapping is found, return 0

# WebDriverの設定
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver2 = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver3 = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

#対象レースデータの取得
race_date = "2024/11/10"
racecourse='kyoto'
#下2桁(レースID)は消す
main_page_url = "https://race.netkeiba.com/race/shutuba.html?race_id=2024080604"
start = 11
end = 12

# CSVから平均と標準偏差を読み込む
time_df = pd.read_csv('config/standard_deviation.csv', index_col=0)
mean1, mean2 = time_df['Mean']['First Time'], time_df['Mean']['Second Time']
std1, std2 = time_df['Standard Deviation']['First Time'], time_df['Standard Deviation']['Second Time']

for race_no in range(start, end):
    race_num = str(race_no).zfill(2)
    race_id = main_page_url.split("=")[-1] + race_num
    driver.get(main_page_url + race_num)

    soup = BeautifulSoup(driver.page_source, 'html.parser')
    #新馬戦は除外する
    racename = soup.find("h1", class_="RaceName")
    print(racename.text.strip())
    if "新馬" in racename.text.strip() or "2歳未勝利" in racename.text.strip():
        continue

    # "RaceData01"クラスのdivを検索
    element = soup.find("div", class_="RaceData01")
    # 該当要素からテキストを取得
    text = element.text if element else ""

    # テキストからコース種別、距離、方向、天候を抽出
    direction = re.search(r"(左|右|直|芝)", text).group() if re.search(r"(左|右|直|芝)", text) else ""
    weather = re.search(r"天候:(\w+)", text).group(1) if re.search(r"天候:(\w+)", text) else ""

    span = element.find("span").text.strip()
    distance = span[1:-1]
    shiba_mapping = {'芝': 0, 'ダ': 1, '障': 2}
    track = shiba_mapping.get(span[0])
    mawari_mapping = {'右': 0, '左': 1, '芝': 2, '直': 2}
    mawari = mawari_mapping.get(direction)

    babaSpan = soup.find("span", class_="Item04")
    baba = ""
    if soup.find("span", class_="Item04") is not None:
        baba = babaSpan.text.strip()[-1]

    baba_mapping = {'良': 0, '稍': 1, '重': 2, '不': 3}
    baba = baba_mapping.get(baba)

    tenki_mapping = {'晴': 0, '曇': 1, '小': 2, '雨': 3, '雪': 4}
    tenki = tenki_mapping.get(weather)

    race_data = soup.find("div", class_="RaceData02")
    # spanタグのリストを取得します
    spans = race_data.find_all('span')
    # 2番目のspanタグ(インデックスは0から始まるため1を指定)のテキストを取得します
    location = spans[1].text
    place = ""
    if location == "札幌":
        place = "01"
    elif location == "函館":
        place = "02"
    elif location == "福島":
        place = "03"
    elif location == "新潟":
        place = "04"
    elif location == "東京":
        place = "05"
    elif location == "中山":
        place = "06"
    elif location == "中京":
        place = "07"
    elif location == "京都":
        place = "08"
    elif location == "阪神":
        place = "09"
    elif location == "小倉":
        place = "10"

    # "RaceName"クラスのdivを検索
    divRaceName = soup.find("h1", class_="RaceName")
    #レース名
    race_name = divRaceName.text.strip()
    #クラス
    race_rank = ""
    if soup.find("span", class_="Icon_GradeType1") is not None:
        race_rank = 10
    elif soup.find("span", class_="Icon_GradeType2") is not None:
        race_rank = 9
    elif soup.find("span", class_="Icon_GradeType3") is not None:
        race_rank = 8
    elif soup.find("span", class_="Icon_GradeType15") is not None:
        race_rank = 7
    elif soup.find("span", class_="Icon_GradeType16") is not None:
        race_rank = 6
    elif soup.find("span", class_="Icon_GradeType17") is not None:
        race_rank = 5
    elif soup.find("span", class_="Icon_GradeType18") is not None:
        race_rank = 4
    else:
        race_rank = class_mapping(race_name)

    all_results = []  # 全てレース結果を保存するためのリスト
    all_results.append(["race_id","馬","騎手","馬番","走破時間","オッズ","通過順","着順","体重","体重変化","性","齢","斤量","上がり","人気","レース名","日付","開催","クラス","芝・ダート","距離","回り","馬場","天気","場id","場名","日付1","馬番1","騎手1","斤量1","オッズ1","体重1","体重変化1","上がり1","通過順1","着順1","距離1","クラス1","走破時間1","芝・ダート1","天気1","馬場1","日付2","馬番2","騎手2","斤量2","オッズ2","体重2","体重変化2","上がり2","通過順2","着順2","距離2","クラス2","走破時間2","芝・ダート2","天気2","馬場2","日付3","馬番3","騎手3","斤量3","オッズ3","体重3","体重変化3","上がり3","通過順3","着順3","距離3","クラス3","走破時間3","芝・ダート3","天気3","馬場3","日付4","馬番4","騎手4","斤量4","オッズ4","体重4","体重変化4","上がり4","通過順4","着順4","距離4","クラス4","走破時間4","芝・ダート4","天気4","馬場4","日付5","馬番5","騎手5","斤量5","オッズ5","体重5","体重変化5","上がり5","通過順5","着順5","距離5","クラス5","走破時間5","芝・ダート5","天気5","馬場5"])
    # テーブルを指定
    table = soup.find("table", {"class": "Shutuba_Table"})
    # テーブル内の全ての行を取得
    main_rows = table.find_all("tr")
    url_list = []
    # 各行から必要な情報を取り出し
    for i, row in enumerate(main_rows[2:], start=1):# ヘッダ行をスキップ
        cols = row.find_all("td")
        umaban = cols[1].text.strip()
        sign = cols[2].text.strip()
        if sign == "除外":
            continue
        uma = cols[3].text.strip()
        a_tag = cols[3].find('a')
        url = a_tag.get('href')
        url_list.append(url)
        
        a_tag2 = cols[6].find('a')
        url2 = a_tag2.get('href')
        
        driver2.get(url2)

        soup = BeautifulSoup(driver2.page_source, 'html.parser')
        element2 = soup.find("div", class_="db_head_name")
        h1_text = element2.find('h1').get_text(strip=True)
        name = h1_text.split("\u00a0")[0]

        kishu = re.sub(r'[A-Za-z.]', '', name)

        odds = cols[9].find("span").text.strip()
        pop = cols[10].find("span").text.strip()
        #体重
        horse_weight = cols[8].text.strip()
        weight = 0
        weight_dif = 0
        try:
            weight = int(horse_weight.split("(")[0])
            weight_dif = int(horse_weight.split("(")[1][0:-1])
        except:
            weight = ''
            weight_dif = ''
        weight = weight
        weight_dif = weight_dif
        #斤量
        handicap = cols[5].text.strip()
        #性
        sex = cols[4].text.strip()[0]
        sex_mapping = {'牡':0, '牝': 1, 'セ': 2}
        sex = sex_mapping.get(sex)
        #齢
        age = cols[4].text.strip()[1]
        result = [race_id, uma, kishu, umaban, "", odds, "", "", weight, weight_dif, sex,
        age, handicap, "",pop, race_name, race_date, location, race_rank, track, distance,
        mawari, baba, tenki, place, location]

        all_results.append(result)
    # 現在の日付を取得
    now = datetime.now()
    # cutoff_date を datetime 型に変換
    cutoff_date = datetime.strptime(race_date, '%Y/%m/%d')
    for index ,url in enumerate(url_list):
        results = []  # 馬単位のレース結果を保存するためのリスト
        
        driver3.get(url)

        soup = BeautifulSoup(driver3.page_source, 'html.parser')

        # テーブルを指定
        table = soup.find("table", {"class": "db_h_race_results nk_tb_common"})
        # テーブル内の全ての行を取得
        rows = ''
        try:
            rows = table.find_all("tr")
        except:
            continue
        # 各行から必要な情報を取り出し
        for i, row in enumerate(rows[1:], start=1):# ヘッダ行をスキップ
            cols = row.find_all("td")

            # 日付を解析
            str_date = cols[0].text.strip()
            date = datetime.strptime(str_date, '%Y/%m/%d')

            # 特定の日付より前のデータのみを取得
            if date < cutoff_date:
                # 取得したいデータの位置を指定し取得
                #体重
                horse_weight = cols[23].text.strip()
                weight = 0
                weight_dif = 0
                try:
                    weight = int(horse_weight.split("(")[0])
                    weight_dif = int(horse_weight.split("(")[1][0:-1])
                except:
                    weight = ''
                    weight_dif = ''
                weight = weight
                weight_dif = weight_dif
                #上がり
                up = cols[22].text.strip()
                #通過順
                through = cols[20].text.strip()
                try:
                    numbers = list(map(int, through.split('-')))
                    through = sum(numbers) / len(numbers)
                except ValueError:
                    through = ''
                #着順
                order_of_finish = cols[11].text.strip()
                try:
                    order_of_finish = str(int(order_of_finish))
                except ValueError:
                    order_of_finish = ""
                #馬番
                past_umaban = cols[8].text.strip()
                #騎手
                past_kishu = cols[12].text.strip()
                #斤量
                past_kinryo = cols[13].text.strip()
                #距離
                distance = cols[14].text.strip()
                #芝・ダート
                track = distance[0]
                shiba_mapping = {'芝': 0, 'ダ': 1, '障': 2}
                track = shiba_mapping.get(track)
                #距離
                distance = distance[1:]
                #レース名
                race_name = cols[4].text.strip()
                race_rank = class_mapping(race_name)
                #タイム
                time = cols[17].text.strip()
                try:
                    time = float(time.split(':')[0]) * 60 + sum(float(x) / 10**i for i, x in enumerate(time.split(':')[1].split('.')))
                except:
                    time = ''
                if not time == '':
                    # 1回目の平均と標準偏差で標準化
                    time = -((time - mean1) / std1)
                    
                    # 外れ値の処理:-3より小さい値は-3に、2.5より大きい値は2に変換
                    time = -3 if time < -3 else (2 if time > 2.5 else time)
                    
                    # 2回目の平均と標準偏差で標準化
                    time = (time - mean2) / std2

                #天気
                weather = cols[2].text.strip()
                tenki_mapping = {'晴': 0, '曇': 1, '小': 2, '雨': 3, '雪': 4}
                weather = tenki_mapping.get(weather)
                #オッズ
                odds = cols[9].text.strip()
                track_condition = cols[15].text.strip()
                #馬場状態
                baba_mapping = {'良': 0, '稍': 1, '重': 2, '不': 3}
                track_condition = baba_mapping.get(track_condition)
                            
                result = [str_date,past_umaban,past_kishu,past_kinryo, odds, weight, weight_dif, up, through, order_of_finish, distance, race_rank, time, track, weather, track_condition]
                results.append(result)

                # 5行取得したら終了
                if len(results) >= 5:
                    # 最終アウトプットに追加
                    # 横に連結
                    # resultsをnumpy配列に変換
                    results_array = np.array(results)

                    # numpy配列を1次元に変換
                    flattened_results = results_array.ravel()
                    all_results[index+1].extend(flattened_results)
                    break

                # 最終ループを判定
                if i == len(rows[1:]):
                    if results:  # resultsが空でない場合
                        results_array = np.array(results)
                        flattened_results = results_array.ravel()
                        all_results[index+1].extend(flattened_results)
    
    # convert list to DataFrame
    df_all_results = pd.DataFrame(all_results)
    df_all_results.columns = df_all_results.iloc[0]  # 最初の行を列名として設定
    df_all_results = df_all_results.drop(df_all_results.index[0])  # 最初の行を削除

    # 斤量に関連する列を数値に変換し、変換できないデータはNaNにします。
    kinryo_columns = ['斤量', '斤量1', '斤量2', '斤量3', '斤量4','斤量5']
    for col in kinryo_columns:
        df_all_results[col] = pd.to_numeric(df_all_results[col], errors='coerce')

    # 平均斤量を計算します。
    df_all_results['平均斤量'] = df_all_results[kinryo_columns].mean(axis=1)

    # CSVファイルから騎手の勝率を読み込む
    jockey_win_rate = pd.read_csv('calc_rate/jockey_win_rate.csv')
    # 騎手ごとの勝率を取得して新たなデータフレームを作成
    jockey_stats = jockey_win_rate.groupby('騎手')['騎手の勝率'].first().reset_index()
    # df_combinedとjockey_statsを結合して騎手の勝率を代入する
    df_all_results = df_all_results.merge(jockey_stats, on='騎手', how='left')

    df_all_results["距離差"] = pd.to_numeric(df_all_results["距離"]) - pd.to_numeric(df_all_results["距離1"])
    df_all_results["日付"] = df_all_results["日付"].map(str)
    df_all_results["日付1"] = df_all_results["日付1"].map(str)
    df_all_results["日付2"] = df_all_results["日付2"].map(str)
    df_all_results["日付3"] = df_all_results["日付3"].map(str)
    df_all_results["日付4"] = df_all_results["日付4"].map(str)
    df_all_results["日付5"] = df_all_results["日付5"].map(str)

    df_all_results["日付差"] = (pd.to_datetime(df_all_results["日付"], errors='coerce') - pd.to_datetime(df_all_results["日付1"], errors='coerce')).dt.days
    df_all_results["距離差1"] = pd.to_numeric(df_all_results["距離1"]) - pd.to_numeric(df_all_results["距離2"])
    df_all_results["日付差1"] = (pd.to_datetime(df_all_results["日付1"], errors='coerce') - pd.to_datetime(df_all_results["日付2"], errors='coerce')).dt.days
    df_all_results["距離差2"] = pd.to_numeric(df_all_results["距離2"]) - pd.to_numeric(df_all_results["距離3"])
    df_all_results["日付差2"] = (pd.to_datetime(df_all_results["日付2"], errors='coerce') - pd.to_datetime(df_all_results["日付3"], errors='coerce')).dt.days
    df_all_results["距離差3"] = pd.to_numeric(df_all_results["距離3"]) - pd.to_numeric(df_all_results["距離4"])
    df_all_results["日付差3"] = (pd.to_datetime(df_all_results["日付3"], errors='coerce') - pd.to_datetime(df_all_results["日付4"], errors='coerce')).dt.days
    df_all_results["距離差4"] = pd.to_numeric(df_all_results["距離4"]) - pd.to_numeric(df_all_results["距離5"])
    df_all_results["日付差4"] = (pd.to_datetime(df_all_results["日付4"], errors='coerce') - pd.to_datetime(df_all_results["日付5"], errors='coerce')).dt.days
    # DataFrameを配列に変換
    array_data = df_all_results.values

    # カラム名を配列の先頭に挿入
    columns = df_all_results.columns
    all_results = np.insert(array_data, 0, columns, axis=0)

    date_object = datetime.strptime(race_date, "%Y/%m/%d")
    # 日付を指定のフォーマットに変換
    formatted_date = date_object.strftime('%Y%m%d')
    # データをCSVファイルに出力する
    with open('race_data/race_data_' + racecourse + '_'+race_num+'R_' + formatted_date + '.csv', 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        for data in all_results:
            writer.writerow(data)

まとめ

Seleniumはスクレイピングと比べ速度が非常に遅くなります。

上でも書きましたが、プログラミングに慣れてきたらデータを購入して、モデルのさらなる改良にチャレンジしてみるのも良いかもしれません。

より良い競馬AI開発を頑張っていきましょう!

コメント

  1. りつ より:

    以下のようなエラーが出てタイムアウトしてしまいます。

    socket.timeout: timed out
    urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host=’localhost’, port=49377): Read timed out. (read timeout=120)

    • agus agus より:

      PC/ネットワーク回線/接続先サーバーが重いことによる120秒タイムアウトではないでしょうか?
      何度か試したら動きませんか?

  2. toshi より:

    netkeibaさんの方でまた対策されちゃいましたね。
    現在以下のエラーが出てしまいスクレイピングが出来なくなりました。

    Traceback (most recent call last):
    File “C:\Users\toshi\Desktop\keibayosou\race_table_scraping.py”, line 154, in
    h1_text = element2.find(‘h1’).get_text(strip=True)
    ^^^^^^^^^^^^^
    AttributeError: ‘NoneType’ object has no attribute ‘find’

  3. 勉強 より:

    いつもお世話になっております。
    もしagusさんがどこかでデータを買う場合、また記事にしていただけるでしょうか?
    よろしくお願いいたします。