他身在高楼广厦之中,却有山泽鱼鸟之思
1462 字
7 分钟
Selenium获取cyclicpepedia信息
前言
TIP准备工作 chrome网页端
ChromeDriver
,动态网页Selenium
\
如何安装ChromeDriver
chrome 下载 ChromeDriver
打开 https://googlechromelabs.github.io/chrome-for-testing/#stable
移动其到chrome安装位置 C:\Program Files (x86)\Google\Chrome\Application
设置:此电脑→右击属性→高级系统设置→环境变量→用户变量→Path→编辑→新建
path C:\Program Files (x86)\Google\Chrome\Application\
如何安装Selenium
pip install selenium
爬取代码
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
def get_sequence_from_smiles(smiles_value):
"""
根据输入的 SMILES 值,获取对应的 Amino Acid Sequence(无头模式)。
Args:
smiles_value (str): 输入的 SMILES 字符串。
Returns:
str: 提取的 Amino Acid Sequence 值。如果出错,返回 None。
"""
# 配置无头模式的 Chrome 浏览器选项
chrome_options = Options()
chrome_options.add_argument("--headless") # 无头模式
chrome_options.add_argument("--disable-gpu") # 禁用 GPU(可选)
chrome_options.add_argument("--no-sandbox") # 防止沙盒问题(推荐服务器上使用)
chrome_options.add_argument("--disable-dev-shm-usage") # 防止资源限制错误(推荐服务器上使用)
# 指定 chromedriver.exe 的路径
service = Service("C:/Program Files/Google/Chrome/Application/chromedriver.exe") # 替换为你的实际路径
# 创建无头模式的 WebDriver 实例
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
# 打开目标网页
url = "https://www.biosino.org/iMAC/cyclicpepedia/stru2seq"
driver.get(url)
# 等待 textarea 元素加载(最长等待 10 秒)
textarea = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[2]/form/div[1]/div/textarea'))
)
# 输入 SMILES 数据
textarea.clear() # 清空输入框(如果需要)
textarea.send_keys(smiles_value) # 输入数据
# 定位并点击提交按钮
submit_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[2]/form/div[2]/div[2]/button'))
)
submit_button.click() # 点击按钮
# 等待结果生成
# 定位父级 <p> 标签
result_element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[3]/p[6]'))
)
# 提取目标值
result_text = result_element.text
# 去掉 <b> 标签的内容,只保留实际的值
cleaned_result = result_text.replace("Amino acid sequence :", "").strip()
# 返回结果
return cleaned_result
except Exception as e:
print(f"出现错误: {e}")
return None
finally:
# 关闭 WebDriver
driver.quit()
封装使用
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor
# 定义批量处理函数
def process_smiles_with_threads(input_file, output_file, max_threads=8):
"""
使用多线程处理 SMILES 列,并获取对应的 Sequence。
Args:
input_file (str): 输入的 CSV 文件路径。
output_file (str): 输出的 CSV 文件路径。
max_threads (int): 最大线程数。
"""
# 读取 CSV 文件
df = pd.read_csv(input_file)
# 确保存在 'smiles' 列
if 'smiles' not in df.columns:
raise ValueError("输入文件中必须包含 'smiles' 列!")
# 如果不存在 'sequence' 列,则添加该列
if 'sequence' not in df.columns:
df['sequence'] = None
# 过滤未处理的行
rows_to_process = df[df['sequence'].isna()]
if rows_to_process.empty:
print("所有行均已处理,无需重复操作。")
return
# 定义处理单行的函数
def process_row(index, smiles_value):
try:
print(f"正在处理第 {index + 1} 行的 SMILES: {smiles_value}")
sequence = get_sequence_from_smiles(smiles_value) # 调用你的函数
print(f"第 {index + 1} 行处理完成,结果为: {sequence}")
return index, sequence
except Exception as e:
print(f"第 {index + 1} 行处理出错: {e}")
return index, None
# 使用线程池并行处理
with ThreadPoolExecutor(max_threads) as executor:
futures = {
executor.submit(process_row, index, row['smiles']): index
for index, row in rows_to_process.iterrows()
}
# 获取每个线程的执行结果
for future in futures:
index, sequence = future.result()
df.at[index, 'sequence'] = sequence # 将结果写入 DataFrame
# 实时保存中间结果到文件
df.to_csv(output_file, index=False)
print(f"处理完成,结果已保存到 {output_file}")
if __name__ == "__main__":
# 输入和输出文件路径
input_csv = "a.csv" # 输入文件路径
output_csv = "output.csv" # 输出文件路径
# 调用多线程批量处理函数
process_smiles_with_threads(input_csv, output_csv, max_threads=4)
如果要使用的是sqe2smiles
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
def get_smiles_from_sqe(smiles_value):
"""
根据输入的 SMILES 值,获取对应的 Amino Acid Sequence(无头模式)。
Args:
smiles_value (str): 输入的 SMILES 字符串。
Returns:
str: 提取的 Amino Acid Sequence 值。如果出错,返回 None。
"""
# 配置无头模式的 Chrome 浏览器选项
chrome_options = Options()
chrome_options.add_argument("--headless") # 无头模式
chrome_options.add_argument("--disable-gpu") # 禁用 GPU(可选)
chrome_options.add_argument("--no-sandbox") # 防止沙盒问题(推荐服务器上使用)
chrome_options.add_argument("--disable-dev-shm-usage") # 防止资源限制错误(推荐服务器上使用)
# 指定 chromedriver.exe 的路径
service = Service("C:/Program Files/Google/Chrome/Application/chromedriver.exe") # 替换为你的实际路径
# 创建无头模式的 WebDriver 实例
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
# 打开目标网页
url = "https://www.biosino.org/iMAC/cyclicpepedia/seq2stru"
driver.get(url)
# 等待 textarea 元素加载(最长等待 10 秒)
textarea = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[2]/form/div[1]/div/textarea'))
)
# 输入 SMILES 数据
textarea.clear() # 清空输入框(如果需要)
textarea.send_keys(smiles_value) # 输入数据
# 定位并点击提交按钮
submit_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '/html/body/div[1]/div[3]/div/div/div/div/div/div/div/div[1]/div[2]/form/div[2]/div[2]/button'))
)
submit_button.click() # 点击按钮
# 等待结果生成
# 定位父级 <p> 标签
result_element = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, '/html/body/div/div[3]/div/div[1]/div/div/div[2]/div/div[2]/h3[3]'))
)
# 提取目标值
result_text = result_element.text
# 去掉 <b> 标签的内容,只保留实际的值
cleaned_result = result_text.replace("Amino acid sequence :", "").strip()
# 返回结果
return cleaned_result
except Exception as e:
print(f"出现错误: {e}")
return None
finally:
# 关闭 WebDriver
driver.quit()
import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor
# 定义批量处理函数
def process_sequence_with_threads2(input_file, output_file, max_threads=8):
"""
使用多线程处理 Sequence 列,并获取对应的 SMILES。
Args:
input_file (str): 输入的 CSV 文件路径。
output_file (str): 输出的 CSV 文件路径。
max_threads (int): 最大线程数。
"""
# 读取 CSV 文件
df = pd.read_csv(input_file)
# 确保存在 'sequence' 列
if 'sequence' not in df.columns:
raise ValueError("输入文件中必须包含 'sequence' 列!")
# 如果不存在 'smiles' 列,则添加该列
if 'smiles' not in df.columns:
df['smiles'] = None
# 过滤未处理的行
rows_to_process = df[df['smiles'].isna()]
if rows_to_process.empty:
print("所有行均已处理,无需重复操作。")
return
# 定义处理单行的函数
def process_row(index, sequence_value):
try:
print(f"正在处理第 {index + 1} 行的 Sequence: {sequence_value}")
smiles = get_smiles_from_sqe(sequence_value) # 调用你的函数
print(f"第 {index + 1} 行处理完成,结果为: {smiles}")
return index, smiles
except Exception as e:
print(f"第 {index + 1} 行处理出错: {e}")
return index, None
# 使用线程池并行处理
with ThreadPoolExecutor(max_threads) as executor:
futures = {
executor.submit(process_row, index, row['sequence']): index
for index, row in rows_to_process.iterrows()
}
# 获取每个线程的执行结果
for future in futures:
index, smiles = future.result()
df.at[index, 'smiles'] = smiles # 将结果写入 DataFrame
# 实时保存中间结果到文件
df.to_csv(output_file, index=False)
print(f"处理完成,结果已保存到 {output_file}")
if __name__ == "__main__":
# 输入和输出文件路径
input_csv = "data_all/data4.csv" # 输入文件路径
output_csv = "data_all/data4_smiles.csv" # 输出文件路径
# 调用多线程批量处理函数
process_sequence_with_threads2(input_csv, output_csv, max_threads=4)
Selenium获取cyclicpepedia信息
https://sereinna.github.io/posts/cyclicpepedia_data_get/