Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,26 @@ jobs:
run: |
make install

- name: Lint code formatters
- name: Lint isort
id: isort
run: |
isort --check src/ && \
black --check src/
python -m isort --check --diff src/
continue-on-error: true

- name: Lint black
id: black
run: |
python -m black --check --diff src/
continue-on-error: true

- name: Lint static analysis
id: mypy
run: |
python -m mypy src/
continue-on-error: true

- name: Check for failures
run: |
mypy src/
if [ "${{ steps.isort.outcome }}" == "failure" ] || [ "${{ steps.black.outcome }}" == "failure" ] || [ "${{ steps.mypy.outcome }}" == "failure" ]; then
exit 1
fi
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ stop: ## Stop the compose
docker compose down

lint: ## Lint source
isort src/
black src/
mypy src/
python -m isort src/
python -m black src/
python -m mypy src/

clean: ## Clean the repo
rm -f pytest.xml
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies = [
"neo4j",
"dotenv",
"requests",
"pillow",
]
license = "MIT"
license-files = ["LICEN[CS]E*"]
Expand All @@ -19,7 +20,6 @@ crawler = [
"html5lib",
"bs4",
"selenium",
"pillow",
]
dashboard = [
"bs4",
Expand Down
144 changes: 89 additions & 55 deletions src/webmap/boundingbox/capture.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,23 @@
import io
import time
from typing import cast

from PIL import Image, ImageDraw
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webelement import WebElement

from webmap.boundingbox.bbox import BBox
from webmap.boundingbox.database import BoundingBoxDB
from webmap.screenshot import ScreenshotCapture

SERVER = "http://selenium:4444/wd/hub"


class BoundingBoxCapture:
class BoundingBoxCapture(ScreenshotCapture):
def __init__(self) -> None:
self.db = BoundingBoxDB()
self._setup_driver()

def _setup_driver(self) -> None:
"""Setup remote Chrome driver."""
options = Options()
options.add_argument("--no-sandbox")
options.add_argument("--window-size=1920,1080")
options.add_argument("--headless")
options.add_argument("--disable-gpu")

self.driver = webdriver.Remote(command_executor=SERVER, options=options)
super().__init__()
self.db: BoundingBoxDB = BoundingBoxDB() # type: ignore[assignment]
self._loaded_page = ""

def load_page(self, url: str) -> None:
def _load_page(self, url: str) -> None:
try:
self.driver.get(url)
self._loaded_page = url
Expand All @@ -38,38 +27,33 @@ def load_page(self, url: str) -> None:

def take_clean_screenshot(self, url: str) -> bytes | None:
"""Take screenshot of URL and return as bytes."""
if url is not self._loaded_page:
self.load_page(url)
try:
screenshot_png = self.driver.get_screenshot_as_png()
return screenshot_png
except Exception as e:
print(f"Screenshot Error: taking screenshot of {url}: {e}")
return None
return self.take_screenshot(url)

def take_bbox_screenshot(self, url: str) -> bytes | None:
"""Take screenshot of URL and return as bytes."""
if url is not self._loaded_page:
self.load_page(url)
"""Take screenshot of URL with bounding boxes drawn."""
try:
screenshot_png = self.driver.get_screenshot_as_png()
self.driver.execute_script("window.scrollTo(0, 0)")
time.sleep(0.3)
fixed_header_height = self._get_header_height()
elements = self._collect_elements(url)
bboxes = self._extract_bboxs(elements)

screenshot_png = self._fullpage_screenshot()
device_pixel_ratio = self.driver.execute_script(
"return window.devicePixelRatio"
)
image = Image.open(io.BytesIO(screenshot_png))
buttons = self.get_all_by_xpath(url, "//button")
textarea = self.get_all_by_xpath(url, "//textarea")
elements = buttons + textarea
draw = ImageDraw.Draw(image)
for element in elements:
bbox: BBox = self.get_bbox(element)

for bbox in bboxes:
if (
abs(bbox.x_max - bbox.x_min) <= 5
and abs(bbox.y_max - bbox.y_min) <= 5
):
print("Below limit")
continue
draw.rectangle(
[bbox.x_min, bbox.y_min, bbox.x_max, bbox.y_max],
outline="red",
width=2,
)
self._draw_bbox(fixed_header_height, device_pixel_ratio, draw, bbox)

new_image = io.BytesIO()
image.save(new_image, "PNG")
new_image.seek(0)
Expand All @@ -78,22 +62,79 @@ def take_bbox_screenshot(self, url: str) -> bytes | None:
print(f"BoundingBox Error: taking screenshot of {url}: {e}")
return None

def _draw_bbox(
self,
fixed_header_height: int,
device_pixel_ratio: int,
draw: ImageDraw.ImageDraw,
bbox: BBox,
) -> None:
y_min = (bbox.y_min + fixed_header_height) * device_pixel_ratio
y_max = (bbox.y_max + fixed_header_height) * device_pixel_ratio
draw.rectangle(
[
bbox.x_min * device_pixel_ratio,
y_min,
bbox.x_max * device_pixel_ratio,
y_max,
],
outline="red",
width=2,
)

def _extract_bboxs(self, elements: list[WebElement]) -> list[BBox]:
bboxes: list[BBox] = []

for i, element in enumerate(elements):
if not element.is_displayed():
continue
bbox = self.get_bbox(element)
enabled = element.is_enabled()
bboxes.append(bbox)
return bboxes

def _collect_elements(self, url: str) -> list[WebElement]:
"""Returns all the webelements of the current driver"""
buttons = self.get_all_by_xpath(url, "//button")
textarea = self.get_all_by_xpath(url, "//textarea")
links = self.get_all_by_xpath(url, "//a")
elements = buttons + textarea + links
return elements

def _get_header_height(self) -> int:
"""Returns the header height of the currennt driver"""
fixed_header_height: int = self.driver.execute_script("""
var maxHeight = 0;
var elements = document.querySelectorAll('*');
elements.forEach(function(el) {
var style = window.getComputedStyle(el);
if (style.position === 'fixed' || style.position === 'sticky') {
var rect = el.getBoundingClientRect();
if (rect.top === 0 && rect.height > maxHeight) {
maxHeight = rect.height;
}
}
});
return maxHeight;
""")
return fixed_header_height

def get_html(self, url: str) -> str:
if url is not self._loaded_page:
self.load_page(url)
self._load_page(url)
try:
html = self.driver.page_source
return html
return cast(str, html)
except Exception as e:
print(f"BoundingBox Error: getting html source {url}: {e}")
return ""

def get_all_by_xpath(self, url: str, x_string: str) -> list[WebElement]:
if url is not self._loaded_page:
self.load_page(url)
self._load_page(url)
try:
buttons = self.driver.find_elements(By.XPATH, x_string)
return buttons
return cast(list[WebElement], buttons)
except Exception as e:
print(f"BoundingBox Error: getting all by x path {url}: {e}")
return []
Expand All @@ -106,10 +147,10 @@ def get_bbox(self, element: WebElement) -> BBox:
bbox = BBox(x, y, x + width, y + height, element.text, element.tag_name)
return bbox

def capture_and_save(self, url: str) -> bool:
def capture_and_save(self, url: str, fullpage: bool = True) -> bool:
"""Take screenshots and save bounding box data to database."""
if url != self._loaded_page:
self.load_page(url)
self._load_page(url)

clean_screenshot = self.take_clean_screenshot(url)

Expand All @@ -129,16 +170,9 @@ def capture_and_save(self, url: str) -> bool:
else:
success &= self.db.save_screenshot(url, clean_screenshot, "clean")
if bbox_screenshot is None:
success &= self.db.save_screenshot(url, b"error", "clean-error")
success &= self.db.save_screenshot(url, b"error", "bbox-error")
else:
success &= self.db.save_screenshot(url, bbox_screenshot, "bbox")
success &= self.db.save_bounding_boxes(url, bounding_boxes)

return success

def close(self) -> None:
"""Close the webdriver."""
self.driver.quit()

def __del__(self) -> None:
self.close()
return bool(success)
13 changes: 9 additions & 4 deletions src/webmap/crawler.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import time
from time import sleep
from typing import Callable, List

from webmap.database import Neo4JControl, Neo4JGraph, Neo4JStack, StatusDB
from webmap.scraper import get_all_links, get_HTML_response, get_soup
from webmap.url_handling import isValid
from webmap.url_handling import is_valid


class Crawler:
Expand Down Expand Up @@ -38,13 +39,14 @@ def run(self) -> None:
self._status.log_status(f"{self._plugins}")

while self._should_run():
start_time = time.time()
if self._stack.count() > 0:
url = self._stack.pop()
if url is None:
self._status.log_status("Stack returned unexpected value")
continue

if not isValid(url):
if not is_valid(url):
self._status.log_status(f"Invalid URL: {url}")
continue

Expand All @@ -60,14 +62,17 @@ def run(self) -> None:
links = self._fetch_links(url)
for element in self._parse_links(url, links):
self._stack.push(element)
sleep(self._control.get_time())
run_time = round(time.time() - start_time, 2)
remaining_sleep_time = self._control.get_time() - run_time
if remaining_sleep_time > 0:
sleep(remaining_sleep_time)

def _parse_links(
self, website_origin: str, list_with_links: list[str]
) -> list[str]:
found_urls = []
for link in list_with_links:
if not isValid(link):
if not is_valid(link):
continue

found_urls.append(link)
Expand Down
Loading