1462 字
7 分钟
Selenium获取cyclicpepedia信息

前言#

TIP

准备工作 chrome网页端 ChromeDriver,动态网页 Selenium\

如何安装ChromeDriver#

chrome 下载 ChromeDriver

打开 https://googlechromelabs.github.io/chrome-for-testing/#stable

移动其到chrome安装位置 C:\Program Files (x86)\Google\Chrome\Application

设置:此电脑→右击属性→高级系统设置→环境变量→用户变量→Path→编辑→新建

path C:\Program Files (x86)\Google\Chrome\Application\

如何安装Selenium#

pip install selenium

爬取代码#

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

def get_sequence_from_smiles(smiles_value):
    """
    根据输入的 SMILES 值,获取对应的 Amino Acid Sequence(无头模式)。
    
    Args:
        smiles_value (str): 输入的 SMILES 字符串。
    
    Returns:
        str: 提取的 Amino Acid Sequence 值。如果出错,返回 None。
    """
    # 配置无头模式的 Chrome 浏览器选项
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # 无头模式
    chrome_options.add_argument("--disable-gpu")  # 禁用 GPU(可选)
    chrome_options.add_argument("--no-sandbox")  # 防止沙盒问题(推荐服务器上使用)
    chrome_options.add_argument("--disable-dev-shm-usage")  # 防止资源限制错误(推荐服务器上使用)

    # 指定 chromedriver.exe 的路径
    service = Service("C:/Program Files/Google/Chrome/Application/chromedriver.exe")  # 替换为你的实际路径

    # 创建无头模式的 WebDriver 实例
    driver = webdriver.Chrome(service=service, options=chrome_options)

    try:
        # 打开目标网页
        url = "https://www.biosino.org/iMAC/cyclicpepedia/stru2seq"
        driver.get(url)

        # 等待 textarea 元素加载(最长等待 10 秒)
        textarea = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[2]/form/div[1]/div/textarea'))
        )

        # 输入 SMILES 数据
        textarea.clear()  # 清空输入框(如果需要)
        textarea.send_keys(smiles_value)  # 输入数据

        # 定位并点击提交按钮
        submit_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[2]/form/div[2]/div[2]/button'))
        )
        submit_button.click()  # 点击按钮

        # 等待结果生成
        # 定位父级 <p> 标签
        result_element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[3]/p[6]'))
        )

        # 提取目标值
        result_text = result_element.text

        # 去掉 <b> 标签的内容,只保留实际的值
        cleaned_result = result_text.replace("Amino acid sequence :", "").strip()

        # 返回结果
        return cleaned_result

    except Exception as e:
        print(f"出现错误: {e}")
        return None

    finally:
        # 关闭 WebDriver
        driver.quit()

封装使用#


import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor

# 定义批量处理函数
def process_smiles_with_threads(input_file, output_file, max_threads=8):
    """
    使用多线程处理 SMILES 列,并获取对应的 Sequence。
    
    Args:
        input_file (str): 输入的 CSV 文件路径。
        output_file (str): 输出的 CSV 文件路径。
        max_threads (int): 最大线程数。
    """
    # 读取 CSV 文件
    df = pd.read_csv(input_file)

    # 确保存在 'smiles' 列
    if 'smiles' not in df.columns:
        raise ValueError("输入文件中必须包含 'smiles' 列!")

    # 如果不存在 'sequence' 列,则添加该列
    if 'sequence' not in df.columns:
        df['sequence'] = None

    # 过滤未处理的行
    rows_to_process = df[df['sequence'].isna()]

    if rows_to_process.empty:
        print("所有行均已处理,无需重复操作。")
        return

    # 定义处理单行的函数
    def process_row(index, smiles_value):
        try:
            print(f"正在处理第 {index + 1} 行的 SMILES: {smiles_value}")
            sequence = get_sequence_from_smiles(smiles_value)  # 调用你的函数
            print(f"第 {index + 1} 行处理完成,结果为: {sequence}")
            return index, sequence
        except Exception as e:
            print(f"第 {index + 1} 行处理出错: {e}")
            return index, None

    # 使用线程池并行处理
    with ThreadPoolExecutor(max_threads) as executor:
        futures = {
            executor.submit(process_row, index, row['smiles']): index
            for index, row in rows_to_process.iterrows()
        }

        # 获取每个线程的执行结果
        for future in futures:
            index, sequence = future.result()
            df.at[index, 'sequence'] = sequence  # 将结果写入 DataFrame

            # 实时保存中间结果到文件
            df.to_csv(output_file, index=False)

    print(f"处理完成,结果已保存到 {output_file}")


if __name__ == "__main__":
    # 输入和输出文件路径
    input_csv = "a.csv"  # 输入文件路径
    output_csv = "output.csv"  # 输出文件路径

    # 调用多线程批量处理函数
    process_smiles_with_threads(input_csv, output_csv, max_threads=4)

如果要使用的是sqe2smiles#



from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

def get_smiles_from_sqe(smiles_value):
    """
    根据输入的 SMILES 值,获取对应的 Amino Acid Sequence(无头模式)。
    
    Args:
        smiles_value (str): 输入的 SMILES 字符串。
    
    Returns:
        str: 提取的 Amino Acid Sequence 值。如果出错,返回 None。
    """
    # 配置无头模式的 Chrome 浏览器选项
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # 无头模式
    chrome_options.add_argument("--disable-gpu")  # 禁用 GPU(可选)
    chrome_options.add_argument("--no-sandbox")  # 防止沙盒问题(推荐服务器上使用)
    chrome_options.add_argument("--disable-dev-shm-usage")  # 防止资源限制错误(推荐服务器上使用)

    # 指定 chromedriver.exe 的路径
    service = Service("C:/Program Files/Google/Chrome/Application/chromedriver.exe")  # 替换为你的实际路径

    # 创建无头模式的 WebDriver 实例
    driver = webdriver.Chrome(service=service, options=chrome_options)

    try:
        # 打开目标网页
        url = "https://www.biosino.org/iMAC/cyclicpepedia/seq2stru"
        driver.get(url)

        # 等待 textarea 元素加载(最长等待 10 秒)
        textarea = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[2]/form/div[1]/div/textarea'))
        )

        # 输入 SMILES 数据
        textarea.clear()  # 清空输入框(如果需要)
        textarea.send_keys(smiles_value)  # 输入数据

        # 定位并点击提交按钮
        submit_button = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[2]/form/div[2]/div[2]/button'))
        )
        submit_button.click()  # 点击按钮

        # 等待结果生成
        # 定位父级 <p> 标签
        result_element = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.XPATH, '/html/body/div/div[3]/div/div[1]/div/div/div[2]/div/div[2]/h3[3]'))
        )

        # 提取目标值
        result_text = result_element.text

        # 去掉 <b> 标签的内容,只保留实际的值
        cleaned_result = result_text.replace("Amino acid sequence :", "").strip()

        # 返回结果
        return cleaned_result

    except Exception as e:
        print(f"出现错误: {e}")
        return None

    finally:
        # 关闭 WebDriver
        driver.quit()


import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor

# 定义批量处理函数
def process_sequence_with_threads2(input_file, output_file, max_threads=8):
    """
    使用多线程处理 Sequence 列,并获取对应的 SMILES。
    
    Args:
        input_file (str): 输入的 CSV 文件路径。
        output_file (str): 输出的 CSV 文件路径。
        max_threads (int): 最大线程数。
    """
    # 读取 CSV 文件
    df = pd.read_csv(input_file)

    # 确保存在 'sequence' 列
    if 'sequence' not in df.columns:
        raise ValueError("输入文件中必须包含 'sequence' 列!")

    # 如果不存在 'smiles' 列,则添加该列
    if 'smiles' not in df.columns:
        df['smiles'] = None

    # 过滤未处理的行
    rows_to_process = df[df['smiles'].isna()]

    if rows_to_process.empty:
        print("所有行均已处理,无需重复操作。")
        return

    # 定义处理单行的函数
    def process_row(index, sequence_value):
        try:
            print(f"正在处理第 {index + 1} 行的 Sequence: {sequence_value}")
            smiles = get_smiles_from_sqe(sequence_value)  # 调用你的函数
            print(f"第 {index + 1} 行处理完成,结果为: {smiles}")
            return index, smiles
        except Exception as e:
            print(f"第 {index + 1} 行处理出错: {e}")
            return index, None

    # 使用线程池并行处理
    with ThreadPoolExecutor(max_threads) as executor:
        futures = {
            executor.submit(process_row, index, row['sequence']): index
            for index, row in rows_to_process.iterrows()
        }

        # 获取每个线程的执行结果
        for future in futures:
            index, smiles = future.result()
            df.at[index, 'smiles'] = smiles  # 将结果写入 DataFrame

            # 实时保存中间结果到文件
            df.to_csv(output_file, index=False)

    print(f"处理完成,结果已保存到 {output_file}")


if __name__ == "__main__":
    # 输入和输出文件路径
    input_csv = "data_all/data4.csv"  # 输入文件路径
    output_csv = "data_all/data4_smiles.csv"  # 输出文件路径

    # 调用多线程批量处理函数
    process_sequence_with_threads2(input_csv, output_csv, max_threads=4)

Selenium获取cyclicpepedia信息
https://sereinna.github.io/posts/cyclicpepedia_data_get/
作者
serein
发布于
2025-01-13
许可协议
CC BY-NC-SA 4.0