BoyuNLP's picture
init
3bbba47
raw
history blame
15 kB
# -*- coding: utf-8 -*-
# Copyright (c) 2024 OSU Natural Language Processing Group
#
# Licensed under the OpenRAIL-S License;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.licenses.ai/ai-pubs-open-rails-vz1
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import asyncio
from difflib import SequenceMatcher
from playwright.sync_api import Playwright, expect, sync_playwright
# from playwright.async_api import async_playwright
from pathlib import Path
import toml
import os
import traceback
async def normal_launch_async(playwright: Playwright,headless=False,args=None):
browser = await playwright.chromium.launch(
traces_dir=None,
headless=False,
args=args,
# ignore_default_args=ignore_args,
# chromium_sandbox=False,
)
return browser
async def normal_new_context_async(
browser,
storage_state=None,
har_path=None,
video_path=None,
tracing=False,
trace_screenshots=False,
trace_snapshots=False,
trace_sources=False,
locale=None,
geolocation=None,
user_agent: str = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
viewport: dict = {"width": 1280, "height": 720},
):
context = await browser.new_context(
storage_state=storage_state,
user_agent=user_agent,
viewport=viewport,
locale=locale,
record_har_path=har_path,
record_video_dir=video_path,
geolocation=geolocation,
)
if tracing:
await context.tracing.start(screenshots=trace_screenshots, snapshots=trace_snapshots, sources=trace_sources)
return context
#
# def persistent_launch(playwright: Playwright, user_data_dir: str = ""):
# context = playwright.chromium.launch_persistent_context(
# user_data_dir=user_data_dir,
# headless=False,
# args=["--no-default-browser-check",
# "--no_sandbox",
# "--disable-blink-features=AutomationControlled",
# ],
# ignore_default_args=ignore_args,
# user_agent="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
# viewport={"width": 1280, "height": 720},
# bypass_csp=True,
# slow_mo=1000,
# chromium_sandbox=True,
# channel="chrome-dev"
# )
# return context
#
# async def persistent_launch_async(playwright: Playwright, user_data_dir: str = "", record_video_dir="video"):
# context = await playwright.chromium.launch_persistent_context(
# user_data_dir=user_data_dir,
# headless=False,
# args=[
# "--disable-blink-features=AutomationControlled",
# ],
# ignore_default_args=ignore_args,
# user_agent="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
# # viewport={"width": 1280, "height": 720},
# record_video_dir=record_video_dir,
# channel="chrome-dev"
# # slow_mo=1000,
# )
# return context
def remove_extra_eol(text):
# Replace EOL symbols
text = text.replace('\n', ' ')
return re.sub(r'\s{2,}', ' ', text)
def get_first_line(s):
first_line = s.split('\n')[0]
tokens = first_line.split()
if len(tokens) > 8:
return ' '.join(tokens[:8]) + '...'
else:
return first_line
async def get_element_description(element, tag_name, role_value, type_value):
'''
Asynchronously generates a descriptive text for a web element based on its tag type.
Handles various HTML elements like 'select', 'input', and 'textarea', extracting attributes and content relevant to accessibility and interaction.
'''
# text_content = await element.inner_text(timeout=0)
# text = (text_content or '').strip()
#
# print(text)
salient_attributes = [
"alt",
"aria-describedby",
"aria-label",
"aria-role",
"input-checked",
# "input-value",
"label",
"name",
"option_selected",
"placeholder",
"readonly",
"text-value",
"title",
"value",
]
parent_value = "parent_node: "
parent_locator = element.locator('xpath=..')
num_parents = await parent_locator.count()
if num_parents > 0:
# only will be zero or one parent node
parent_text = (await parent_locator.inner_text(timeout=0) or "").strip()
if parent_text:
parent_value += parent_text
parent_value = remove_extra_eol(get_first_line(parent_value)).strip()
if parent_value == "parent_node:":
parent_value = ""
else:
parent_value += " "
if tag_name == "select":
text1 = "Selected Options: "
text3 = " - Options: "
text2 = await element.evaluate(
"select => select.options[select.selectedIndex].textContent", timeout=0
)
if text2:
options = await element.evaluate("select => Array.from(select.options).map(option => option.text)",
timeout=0)
text4 = " | ".join(options)
if not text4:
text4 = await element.text_content(timeout=0)
if not text4:
text4 = await element.inner_text(timeout=0)
return parent_value+text1 + remove_extra_eol(text2.strip()) + text3 + text4
input_value = ""
none_input_type = ["submit", "reset", "checkbox", "radio", "button", "file"]
if tag_name == "input" or tag_name == "textarea":
if role_value not in none_input_type and type_value not in none_input_type:
text1 = "input value="
text2 = await element.input_value(timeout=0)
if text2:
input_value = text1 + "\"" + text2 + "\"" + " "
text_content = await element.text_content(timeout=0)
text = (text_content or '').strip()
# print(text)
if text:
text = remove_extra_eol(text)
if len(text) > 80:
text_content_in = await element.inner_text(timeout=0)
text_in = (text_content_in or '').strip()
if text_in:
return input_value + remove_extra_eol(text_in)
else:
return input_value + text
# get salient_attributes
text1 = ""
for attr in salient_attributes:
attribute_value = await element.get_attribute(attr, timeout=0)
if attribute_value:
text1 += f"{attr}=" + "\"" + attribute_value.strip() + "\"" + " "
text = (parent_value + text1).strip()
if text:
return input_value + remove_extra_eol(text.strip())
# try to get from the first child node
first_child_locator = element.locator('xpath=./child::*[1]')
num_childs = await first_child_locator.count()
if num_childs>0:
for attr in salient_attributes:
attribute_value = await first_child_locator.get_attribute(attr, timeout=0)
if attribute_value:
text1 += f"{attr}=" + "\"" + attribute_value.strip() + "\"" + " "
text = (parent_value + text1).strip()
if text:
return input_value + remove_extra_eol(text.strip())
return None
async def get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=None):
try:
tag_name_list = ['a', 'button',
'input',
'select', 'textarea', 'adc-tab']
rect = await element.bounding_box() or {'x': -1, 'y': -1, 'width': 0, 'height': 0}
if rect['x']<0 or rect['y']<0 or rect['width']<=4 or rect['height']<=4 or rect['y']+rect['height']>viewport_size["height"] or rect['x']+ rect['width']>viewport_size["width"]:
return None
if coordinates is not None:
if coordinates[0]>=rect['x'] and coordinates[0]<=rect['x']+rect['width'] and coordinates[1]>=rect['y'] and coordinates[1]<=rect['y']+rect['height']:
print(coordinates)
print(rect)
else:
return None
box_model = [rect['x'], rect['y'], rect['x'] + rect['width'], rect['y'] + rect['height']]
center_point = (round((box_model[0] + box_model[2]) / 2 , 3),
round((box_model[1] + box_model[3]) / 2 , 3))
if await element.is_hidden(timeout=0) or await element.is_disabled(timeout=0):
return None
if center_point in seen_elements:
return None
# await aprint(element,tag_name)
if tag_name in tag_name_list:
tag_head = tag_name
real_tag_name = tag_name
else:
real_tag_name = await element.evaluate("element => element.tagName.toLowerCase()", timeout=0)
if real_tag_name in tag_name_list:
# already detected
return None
else:
tag_head = real_tag_name
text_element = ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'td', "div","em","center","strong","b","i","small","mark","abbr","cite","q","blockquote","span","nobr"]
if real_tag_name in text_element:
return None
role_value = await element.get_attribute('role', timeout=0)
type_value = await element.get_attribute('type', timeout=0)
# await aprint("start to get element description",element,tag_name )
description = await get_element_description(element, real_tag_name, role_value, type_value)
# print(description)
if not description:
return None
if role_value:
tag_head += " role=" + "\"" + role_value + "\""
if type_value:
tag_head += " type=" + "\"" + type_value + "\""
'''
0: center_point =(x,y)
1: description
2: tag_with_role: tag_head with role and type # TODO: Consider adding more
3. box
4. selector
5. tag
'''
selector = element
if coordinates is not None:
if coordinates[0]>=rect['x'] and coordinates[0]<=rect['x']+rect['width'] and coordinates[1]>=rect['y'] and coordinates[1]<=rect['y']+rect['height']:
print(tag_head)
print(description)
print(box_model)
else:
return None
return {"center_point":center_point,"description":description,"tag_with_role":tag_head,"box":box_model,"selector":selector,"tag":real_tag_name}
# return [center_point, description, tag_head, box_model, selector, real_tag_name]
except Exception as e:
print(traceback.format_exc())
print(e)
return None
async def get_interactive_elements_with_playwright(page,viewport_size,coordinates=None):
print("Get Interactive elements around: ", coordinates)
interactive_elements_selectors = [
'a', 'button',
'input',
'select', 'textarea',
]
seen_elements = set()
tasks = []
for selector in interactive_elements_selectors:
locator = page.locator(selector)
element_count = await locator.count()
for index in range(element_count):
element = locator.nth(index)
tag_name = selector
task = get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=coordinates)
tasks.append(task)
results = await asyncio.gather(*tasks)
interactive_elements = []
for i in results:
if i:
if i["center_point"] in seen_elements:
continue
else:
seen_elements.add(i["center_point"])
interactive_elements.append(i)
# interactive_elements_selectors = [
# '*'
# ]
# tasks = []
#
# for selector in interactive_elements_selectors:
# locator = page.locator(selector)
# element_count = await locator.count()
# for index in range(element_count):
# element = locator.nth(index)
# tag_name = selector
# task = get_element_data(element, tag_name, viewport_size,seen_elements,coordinates)
#
# tasks.append(task)
#
# results = await asyncio.gather(*tasks)
#
#
# for i in results:
# if i:
# if i["center_point"] in seen_elements:
# continue
# else:
# seen_elements.add(i["center_point"])
# interactive_elements.append(i)
return interactive_elements
async def get_select_elements_with_playwright(page,viewport_size):
interactive_elements_selectors = [
'select'
]
seen_elements = set()
tasks = []
for selector in interactive_elements_selectors:
locator = page.locator(selector)
element_count = await locator.count()
for index in range(element_count):
element = locator.nth(index)
tag_name = selector
task = get_element_data(element, tag_name,viewport_size,seen_elements=[],coordinates=None)
tasks.append(task)
results = await asyncio.gather(*tasks)
interactive_elements = []
for i in results:
if i:
if i["center_point"] in seen_elements:
continue
else:
seen_elements.add(i["center_point"])
interactive_elements.append(i)
return interactive_elements
async def select_option(selector, value):
best_option = [-1, "", -1]
for i in range(await selector.locator("option").count()):
option = await selector.locator("option").nth(i).inner_text()
similarity = SequenceMatcher(None, option, value).ratio()
if similarity > best_option[2]:
best_option = [i, option, similarity]
await selector.select_option(index=best_option[0], timeout=10000)
return remove_extra_eol(best_option[1]).strip()
def saveconfig(config, save_file):
"""
config is a dictionary.
save_path: saving path include file name.
"""
if isinstance(save_file, str):
save_file = Path(save_file)
if isinstance(config, dict):
with open(save_file, 'w') as f:
config_without_key = config
config_without_key["openai"]["api_key"] = "Your API key here"
toml.dump(config_without_key, f)
else:
os.system(" ".join(["cp", str(config), str(save_file)]))