diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 227c6b8..79f555c 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -22,11 +22,26 @@ jobs: run: | make install - - name: Lint code formatters + - name: Lint isort + id: isort run: | - isort --check src/ && \ - black --check src/ + python -m isort --check --diff src/ + continue-on-error: true + + - name: Lint black + id: black + run: | + python -m black --check --diff src/ + continue-on-error: true - name: Lint static analysis + id: mypy + run: | + python -m mypy src/ + continue-on-error: true + + - name: Check for failures run: | - mypy src/ \ No newline at end of file + if [ "${{ steps.isort.outcome }}" == "failure" ] || [ "${{ steps.black.outcome }}" == "failure" ] || [ "${{ steps.mypy.outcome }}" == "failure" ]; then + exit 1 + fi diff --git a/Makefile b/Makefile index 4bc3904..7653f82 100644 --- a/Makefile +++ b/Makefile @@ -26,9 +26,9 @@ stop: ## Stop the compose docker compose down lint: ## Lint source - isort src/ - black src/ - mypy src/ + python -m isort src/ + python -m black src/ + python -m mypy src/ clean: ## Clean the repo rm -f pytest.xml diff --git a/pyproject.toml b/pyproject.toml index a2395c4..b6ac19c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,6 +10,7 @@ dependencies = [ "neo4j", "dotenv", "requests", + "pillow", ] license = "MIT" license-files = ["LICEN[CS]E*"] @@ -19,7 +20,6 @@ crawler = [ "html5lib", "bs4", "selenium", - "pillow", ] dashboard = [ "bs4", diff --git a/src/webmap/boundingbox/capture.py b/src/webmap/boundingbox/capture.py index 8e92a9f..0a31a14 100644 --- a/src/webmap/boundingbox/capture.py +++ b/src/webmap/boundingbox/capture.py @@ -1,34 +1,23 @@ import io +import time +from typing import cast from PIL import Image, ImageDraw -from selenium import webdriver -from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.remote.webelement import WebElement from webmap.boundingbox.bbox import BBox from webmap.boundingbox.database import BoundingBoxDB +from webmap.screenshot import ScreenshotCapture -SERVER = "http://selenium:4444/wd/hub" - -class BoundingBoxCapture: +class BoundingBoxCapture(ScreenshotCapture): def __init__(self) -> None: - self.db = BoundingBoxDB() - self._setup_driver() - - def _setup_driver(self) -> None: - """Setup remote Chrome driver.""" - options = Options() - options.add_argument("--no-sandbox") - options.add_argument("--window-size=1920,1080") - options.add_argument("--headless") - options.add_argument("--disable-gpu") - - self.driver = webdriver.Remote(command_executor=SERVER, options=options) + super().__init__() + self.db: BoundingBoxDB = BoundingBoxDB() # type: ignore[assignment] self._loaded_page = "" - def load_page(self, url: str) -> None: + def _load_page(self, url: str) -> None: try: self.driver.get(url) self._loaded_page = url @@ -38,38 +27,33 @@ def load_page(self, url: str) -> None: def take_clean_screenshot(self, url: str) -> bytes | None: """Take screenshot of URL and return as bytes.""" - if url is not self._loaded_page: - self.load_page(url) - try: - screenshot_png = self.driver.get_screenshot_as_png() - return screenshot_png - except Exception as e: - print(f"Screenshot Error: taking screenshot of {url}: {e}") - return None + return self.take_screenshot(url) def take_bbox_screenshot(self, url: str) -> bytes | None: - """Take screenshot of URL and return as bytes.""" - if url is not self._loaded_page: - self.load_page(url) + """Take screenshot of URL with bounding boxes drawn.""" try: - screenshot_png = self.driver.get_screenshot_as_png() + self.driver.execute_script("window.scrollTo(0, 0)") + time.sleep(0.3) + fixed_header_height = self._get_header_height() + elements = self._collect_elements(url) + bboxes = self._extract_bboxs(elements) + + screenshot_png = self._fullpage_screenshot() + device_pixel_ratio = self.driver.execute_script( + "return window.devicePixelRatio" + ) image = Image.open(io.BytesIO(screenshot_png)) - buttons = self.get_all_by_xpath(url, "//button") - textarea = self.get_all_by_xpath(url, "//textarea") - elements = buttons + textarea draw = ImageDraw.Draw(image) - for element in elements: - bbox: BBox = self.get_bbox(element) + + for bbox in bboxes: if ( abs(bbox.x_max - bbox.x_min) <= 5 and abs(bbox.y_max - bbox.y_min) <= 5 ): + print("Below limit") continue - draw.rectangle( - [bbox.x_min, bbox.y_min, bbox.x_max, bbox.y_max], - outline="red", - width=2, - ) + self._draw_bbox(fixed_header_height, device_pixel_ratio, draw, bbox) + new_image = io.BytesIO() image.save(new_image, "PNG") new_image.seek(0) @@ -78,22 +62,79 @@ def take_bbox_screenshot(self, url: str) -> bytes | None: print(f"BoundingBox Error: taking screenshot of {url}: {e}") return None + def _draw_bbox( + self, + fixed_header_height: int, + device_pixel_ratio: int, + draw: ImageDraw.ImageDraw, + bbox: BBox, + ) -> None: + y_min = (bbox.y_min + fixed_header_height) * device_pixel_ratio + y_max = (bbox.y_max + fixed_header_height) * device_pixel_ratio + draw.rectangle( + [ + bbox.x_min * device_pixel_ratio, + y_min, + bbox.x_max * device_pixel_ratio, + y_max, + ], + outline="red", + width=2, + ) + + def _extract_bboxs(self, elements: list[WebElement]) -> list[BBox]: + bboxes: list[BBox] = [] + + for i, element in enumerate(elements): + if not element.is_displayed(): + continue + bbox = self.get_bbox(element) + enabled = element.is_enabled() + bboxes.append(bbox) + return bboxes + + def _collect_elements(self, url: str) -> list[WebElement]: + """Returns all the webelements of the current driver""" + buttons = self.get_all_by_xpath(url, "//button") + textarea = self.get_all_by_xpath(url, "//textarea") + links = self.get_all_by_xpath(url, "//a") + elements = buttons + textarea + links + return elements + + def _get_header_height(self) -> int: + """Returns the header height of the currennt driver""" + fixed_header_height: int = self.driver.execute_script(""" + var maxHeight = 0; + var elements = document.querySelectorAll('*'); + elements.forEach(function(el) { + var style = window.getComputedStyle(el); + if (style.position === 'fixed' || style.position === 'sticky') { + var rect = el.getBoundingClientRect(); + if (rect.top === 0 && rect.height > maxHeight) { + maxHeight = rect.height; + } + } + }); + return maxHeight; + """) + return fixed_header_height + def get_html(self, url: str) -> str: if url is not self._loaded_page: - self.load_page(url) + self._load_page(url) try: html = self.driver.page_source - return html + return cast(str, html) except Exception as e: print(f"BoundingBox Error: getting html source {url}: {e}") return "" def get_all_by_xpath(self, url: str, x_string: str) -> list[WebElement]: if url is not self._loaded_page: - self.load_page(url) + self._load_page(url) try: buttons = self.driver.find_elements(By.XPATH, x_string) - return buttons + return cast(list[WebElement], buttons) except Exception as e: print(f"BoundingBox Error: getting all by x path {url}: {e}") return [] @@ -106,10 +147,10 @@ def get_bbox(self, element: WebElement) -> BBox: bbox = BBox(x, y, x + width, y + height, element.text, element.tag_name) return bbox - def capture_and_save(self, url: str) -> bool: + def capture_and_save(self, url: str, fullpage: bool = True) -> bool: """Take screenshots and save bounding box data to database.""" if url != self._loaded_page: - self.load_page(url) + self._load_page(url) clean_screenshot = self.take_clean_screenshot(url) @@ -129,16 +170,9 @@ def capture_and_save(self, url: str) -> bool: else: success &= self.db.save_screenshot(url, clean_screenshot, "clean") if bbox_screenshot is None: - success &= self.db.save_screenshot(url, b"error", "clean-error") + success &= self.db.save_screenshot(url, b"error", "bbox-error") else: success &= self.db.save_screenshot(url, bbox_screenshot, "bbox") success &= self.db.save_bounding_boxes(url, bounding_boxes) - return success - - def close(self) -> None: - """Close the webdriver.""" - self.driver.quit() - - def __del__(self) -> None: - self.close() + return bool(success) diff --git a/src/webmap/crawler.py b/src/webmap/crawler.py index d8ecb06..8f690cb 100644 --- a/src/webmap/crawler.py +++ b/src/webmap/crawler.py @@ -1,9 +1,10 @@ +import time from time import sleep from typing import Callable, List from webmap.database import Neo4JControl, Neo4JGraph, Neo4JStack, StatusDB from webmap.scraper import get_all_links, get_HTML_response, get_soup -from webmap.url_handling import isValid +from webmap.url_handling import is_valid class Crawler: @@ -38,13 +39,14 @@ def run(self) -> None: self._status.log_status(f"{self._plugins}") while self._should_run(): + start_time = time.time() if self._stack.count() > 0: url = self._stack.pop() if url is None: self._status.log_status("Stack returned unexpected value") continue - if not isValid(url): + if not is_valid(url): self._status.log_status(f"Invalid URL: {url}") continue @@ -60,14 +62,17 @@ def run(self) -> None: links = self._fetch_links(url) for element in self._parse_links(url, links): self._stack.push(element) - sleep(self._control.get_time()) + run_time = round(time.time() - start_time, 2) + remaining_sleep_time = self._control.get_time() - run_time + if remaining_sleep_time > 0: + sleep(remaining_sleep_time) def _parse_links( self, website_origin: str, list_with_links: list[str] ) -> list[str]: found_urls = [] for link in list_with_links: - if not isValid(link): + if not is_valid(link): continue found_urls.append(link) diff --git a/src/webmap/screenshot/capture.py b/src/webmap/screenshot/capture.py index 7feb075..7345598 100644 --- a/src/webmap/screenshot/capture.py +++ b/src/webmap/screenshot/capture.py @@ -1,3 +1,7 @@ +import time +from io import BytesIO + +from PIL import Image from selenium import webdriver from selenium.webdriver.chrome.options import Options @@ -18,20 +22,67 @@ def _setup_driver(self) -> None: options.add_argument("--window-size=1920,1080") options.add_argument("--headless") options.add_argument("--disable-gpu") + options.add_argument("--disable-notifications") + + prefs = { + "profile.default_content_setting_values.cookies": 2, + "profile.block_third_party_cookies": True, + } + options.add_experimental_option("prefs", prefs) - # Connect to remote Selenium standalone container - self.driver = webdriver.Remote(command_executor=SERVER, options=options) + self.driver: webdriver.Remote = webdriver.Remote( + command_executor=SERVER, options=options + ) def take_screenshot(self, url: str) -> bytes | None: """Take screenshot of URL and return as bytes.""" try: self.driver.get(url) - screenshot_png = self.driver.get_screenshot_as_png() - return screenshot_png + return self._fullpage_screenshot() except Exception as e: print(f"Screenshot Error: taking screenshot of {url}: {e}") return None + def _fullpage_screenshot(self, scroll_delay: float = 0.3) -> bytes: + """ + Takes a fullscreen pageshot + """ + device_pixel_ratio = self.driver.execute_script( + "return window.devicePixelRatio" + ) + + total_height = self.driver.execute_script( + "return document.body.parentNode.scrollHeight" + ) + viewport_height = self.driver.execute_script("return window.innerHeight") + total_width = self.driver.execute_script("return document.body.offsetWidth") + viewport_width = self.driver.execute_script("return document.body.clientWidth") + + assert viewport_width == total_width + + offset = 0 + slices = {} + while offset < total_height: + if offset + viewport_height > total_height: + offset = total_height - viewport_height + + self.driver.execute_script("window.scrollTo({0}, {1})".format(0, offset)) + time.sleep(scroll_delay) + + img = Image.open(BytesIO(self.driver.get_screenshot_as_png())) + slices[offset] = img + + offset = offset + viewport_height + + stitched_image = Image.new( + "RGB", (total_width * device_pixel_ratio, total_height * device_pixel_ratio) + ) + for offset, image in slices.items(): + stitched_image.paste(image, (0, offset * device_pixel_ratio)) + img_byte_arr = BytesIO() + stitched_image.save(img_byte_arr, format="PNG") + return img_byte_arr.getvalue() + def capture_and_save(self, url: str) -> bool: """Take screenshot and save to database.""" screenshot_data = self.take_screenshot(url) diff --git a/src/webmap/url_handling.py b/src/webmap/url_handling.py index 0bb1a23..76eb554 100644 --- a/src/webmap/url_handling.py +++ b/src/webmap/url_handling.py @@ -1,10 +1,10 @@ def get_name_from_URL(url: str) -> str | None: - if not isValid(url): + if not is_valid(url): return None return url.split("//")[-1] -def isValid(url: str) -> bool: +def is_valid(url: str) -> bool: if url is None: return False return url.startswith("http://") or url.startswith("https://") diff --git a/tests/test_stitch.py b/tests/test_stitch.py new file mode 100644 index 0000000..0050e96 --- /dev/null +++ b/tests/test_stitch.py @@ -0,0 +1,71 @@ +import pytest +import math +from PIL import Image, ImageDraw, ImageFont + +pytest.skip("Testing logic, not code", allow_module_level=True) + +WIDTH, HEIGHT = 1920, 1080 + +def black_image() -> Image: + image = Image.new('RGB', (WIDTH, HEIGHT), (0, 0, 0)) + draw = ImageDraw.Draw(image) + draw.text((0, 0),"1920x1080",(255,255,255)) + return image + +def white_image() -> Image: + image = Image.new('RGB', (WIDTH, HEIGHT), (255, 255, 255)) + draw = ImageDraw.Draw(image) + draw.text((0, 0),"1920x1080",(0,0,0)) + return image + +def rainbow_image() -> Image: + pass + +def stich_images(image_one: Image, image_two: Image): + max_width = WIDTH + max_height = HEIGHT + number_of_images = 2 + + image_sheet = Image.new("RGB", (max_width, max_height*number_of_images)) + + for (i, image) in enumerate([image_one, image_two]): + image_sheet.paste(image, ( + 0, + max_height * i + )) + return image_sheet + +def unstitch_image(image: Image) -> list[Image]: + max_width = WIDTH + max_height = HEIGHT + + image_width, image_height = image.size[0], image.size[1] + new_images = math.ceil(image_height/max_height) + images = [image.crop(( + 0, max_height*i, + max_width, max_height*(i+1) + + )) for i in range(0, new_images)] + return images + + +@pytest.mark.parametrize("image", [black_image(), white_image()]) +def test_generate_image(image: Image) -> None: + image_width, image_height = image.size[0], image.size[1] + assert (image_width, image_height) == (WIDTH, HEIGHT) + image.save("a_text.png") + +@pytest.mark.parametrize("images", [[black_image(), white_image()]]) +def test_stitch_images(images: list[Image]) -> None: + assert len(images) == 2 + stichted = stich_images(images[0], images[1]) + image_width, image_height = stichted.size[0], stichted.size[1] + assert (image_width, image_height) == (WIDTH, HEIGHT*2) + +@pytest.mark.parametrize("images", [[black_image(), white_image()]]) +def test_unstitch_image(images): + assert len(images) == 2 + stichted = stich_images(images[0], images[1]) + unstiched = unstitch_image(stichted) + assert list(unstiched[0].getdata()) == list(images[0].getdata()) + assert list(unstiched[1].getdata()) == list(images[1].getdata()) diff --git a/tests/test_url_handling.py b/tests/test_url_handling.py index be5919e..d05f3f1 100644 --- a/tests/test_url_handling.py +++ b/tests/test_url_handling.py @@ -2,19 +2,19 @@ from hypothesis import assume, given from hypothesis import strategies as st -from webmap.url_handling import get_name_from_URL, isValid +from webmap.url_handling import get_name_from_URL, is_valid class TestURLHandling: def test_is_valid_URL(self): - assert isValid("https://www.google.com") == True - assert isValid("http://www.google.com") == True - assert isValid("www.google.com") == False - assert isValid(None) == False + assert is_valid("https://www.google.com") == True + assert is_valid("http://www.google.com") == True + assert is_valid("www.google.com") == False + assert is_valid(None) == False @given(url=st.text()) def test_fuzz_isValidURL(self, url): - isValid(url=url) + is_valid(url=url) def test_get_name_from_URL(self): assert get_name_from_URL("https://www.google.com") == "www.google.com" diff --git a/tools/boundingbox.py b/tools/boundingbox.py index 02ac020..0a32ab0 100755 --- a/tools/boundingbox.py +++ b/tools/boundingbox.py @@ -18,11 +18,10 @@ bBoxCapture = BoundingBoxCapture() - image: bytes = bBoxCapture.take_bbox_screenshot(url) + image: bytes = bBoxCapture.take_bbox_screenshot(url, fullpage=True) if image: with open(filename, "wb") as f: f.write(image) print(f"Screenshot saved to {filename}") else: print("Failed to capture screenshot") - bBoxCapture.close() diff --git a/tools/screenshot.py b/tools/screenshot.py new file mode 100644 index 0000000..2e60c9c --- /dev/null +++ b/tools/screenshot.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +"""Tool that will take a screenshot of a URL""" + +import sys + +from webmap.screenshot import ScreenshotCapture + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("Usage: python screenshot.py [filename]") + sys.exit(1) + + url = sys.argv[1] + filename = sys.argv[2] if len(sys.argv) > 2 else "screenshot.png" + + capture = ScreenshotCapture() + + image = capture.take_screenshot(url) + if image: + with open(filename, "wb") as f: + f.write(image) + print(f"Screenshot saved to {filename}") + else: + print("Failed to capture screenshot")