refactor(backend): ♻️ re-design scraper to utilize strategy pattern for easier testing

This commit is contained in:
hay-kot 2021-11-26 11:58:49 -09:00
parent 791aa8c610
commit 9d9412f08e
6 changed files with 260 additions and 125 deletions

View File

@ -12,7 +12,8 @@ from mealie.schema.recipe import CreateRecipeByUrl, Recipe
from mealie.schema.recipe.recipe import CreateRecipe, CreateRecipeByUrlBulk, RecipeSummary
from mealie.schema.server.tasks import ServerTaskNames
from mealie.services.recipe.recipe_service import RecipeService
from mealie.services.scraper.scraper import create_from_url, scrape_from_url
from mealie.services.scraper.scraper import create_from_url
from mealie.services.scraper.scraper_strategies import RecipeScraperPackage
from mealie.services.server_tasks.background_executory import BackgroundExecutor
user_router = UserAPIRouter()
@ -82,7 +83,8 @@ def parse_recipe_url_bulk(
@user_router.post("/test-scrape-url")
def test_parse_recipe_url(url: CreateRecipeByUrl):
# Debugger should produce the same result as the scraper sees before cleaning
scraped_data = scrape_from_url(url.url)
scraped_data = RecipeScraperPackage(url.url).scrape_url()
if scraped_data:
return scraped_data.schema.data
return "recipe_scrapers was unable to scrape this URL"

View File

@ -172,6 +172,13 @@ def instructions(instructions) -> List[dict]:
def _instruction(line) -> str:
if isinstance(line, dict):
# Some Recipes dotnot adhear to schema
try:
line = line["text"]
except Exception:
line = ""
clean_line = clean_string(line.strip())
# Some sites erroneously escape their strings on multiple levels
while not clean_line == (clean_line := clean_string(clean_line)):

View File

@ -1,46 +0,0 @@
from typing import Tuple
import extruct
from slugify import slugify
from w3lib.html import get_base_url
from mealie.core.config import get_app_dirs
app_dirs = get_app_dirs()
LAST_JSON = app_dirs.DEBUG_DIR.joinpath("last_recipe.json")
def og_field(properties: dict, field_name: str) -> str:
return next((val for name, val in properties if name == field_name), None)
def og_fields(properties: list[Tuple[str, str]], field_name: str) -> list[str]:
return list({val for name, val in properties if name == field_name})
def basic_recipe_from_opengraph(html: str, url: str) -> dict:
base_url = get_base_url(html, url)
data = extruct.extract(html, base_url=base_url, errors="log")
try:
properties = data["opengraph"][0]["properties"]
except Exception:
return
return {
"name": og_field(properties, "og:title"),
"description": og_field(properties, "og:description"),
"image": og_field(properties, "og:image"),
"recipeYield": "",
# FIXME: If recipeIngredient is an empty list, mongodb's data verification fails.
"recipeIngredient": ["Could not detect ingredients"],
# FIXME: recipeInstructions is allowed to be empty but message this is added for user sanity.
"recipeInstructions": [{"text": "Could not detect instructions"}],
"slug": slugify(og_field(properties, "og:title")),
"orgURL": og_field(properties, "og:url"),
"categories": [],
"tags": og_fields(properties, "og:article:tag"),
"dateAdded": None,
"notes": [],
"extras": [],
}

View File

@ -0,0 +1,39 @@
from __future__ import annotations
from typing import Type
from mealie.schema.recipe.recipe import Recipe
from .scraper_strategies import ABCScraperStrategy, RecipeScraperOpenGraph, RecipeScraperPackage
class RecipeScraper:
"""
Scrapes recipes from the web.
"""
# List of recipe scrapers. Note that order matters
scrapers: list[Type[ABCScraperStrategy]]
def __init__(self, scrapers: list[Type[ABCScraperStrategy]] = None) -> None:
if scrapers is None:
scrapers = [
RecipeScraperPackage,
RecipeScraperOpenGraph,
]
self.scrapers = scrapers
def scrape(self, url: str) -> Recipe | None:
"""
Scrapes a recipe from the web.
"""
for scraper in self.scrapers:
scraper = scraper(url)
recipe = scraper.parse()
if recipe is not None:
return recipe
return None

View File

@ -1,16 +1,17 @@
from __future__ import annotations
from enum import Enum
from typing import Any, Callable, Optional
from uuid import uuid4
import requests
from fastapi import HTTPException, status
from recipe_scrapers import NoSchemaFoundInWildMode, SchemaScraperFactory, WebsiteNotImplementedError, scrape_me
from recipe_scrapers import NoSchemaFoundInWildMode, WebsiteNotImplementedError, scrape_me
from slugify import slugify
from mealie.core.root_logger import get_logger
from mealie.schema.recipe import Recipe, RecipeStep
from mealie.schema.recipe import Recipe
from mealie.services.image.image import scrape_image
from mealie.services.scraper import cleaner, open_graph
from .recipe_scraper import RecipeScraper
logger = get_logger()
@ -25,19 +26,17 @@ def create_from_url(url: str) -> Recipe:
Returns:
Recipe: Recipe Object
"""
# Try the different scrapers in order.
if scraped_data := scrape_from_url(url):
new_recipe = clean_scraper(scraped_data, url)
elif og_dict := extract_open_graph_values(url):
new_recipe = Recipe(**og_dict)
else:
scraper = RecipeScraper()
new_recipe = scraper.scrape(url)
if not new_recipe:
raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": ParserErrors.BAD_RECIPE_DATA.value})
logger.info(f"Image {new_recipe.image}")
new_recipe.image = download_image_for_recipe(new_recipe.slug, new_recipe.image)
if new_recipe.name is None or new_recipe.name == "":
new_recipe.name = "No Recipe Found" + uuid4().hex
new_recipe.name = "No Recipe Found - " + uuid4().hex
new_recipe.slug = slugify(new_recipe.name)
return new_recipe
@ -49,14 +48,6 @@ class ParserErrors(str, Enum):
CONNECTION_ERROR = "CONNECTION_ERROR"
def extract_open_graph_values(url) -> Optional[dict]:
r = requests.get(url)
recipe = open_graph.basic_recipe_from_opengraph(r.text, url)
if recipe.get("name", "") == "":
return None
return recipe
def scrape_from_url(url: str):
"""Entry function to scrape a recipe from a url
This will determine if a url can be parsed and return None if not, to allow another parser to try.
@ -77,7 +68,7 @@ def scrape_from_url(url: str):
try:
scraped_schema = scrape_me(url, wild_mode=True)
except (NoSchemaFoundInWildMode, AttributeError):
# Recipe_scraper was unable to extract a recipe.
logger.error("Recipe Scraper was unable to extract a recipe.")
return None
except ConnectionError:
@ -99,62 +90,7 @@ def scrape_from_url(url: str):
return None
def clean_scraper(scraped_data: SchemaScraperFactory.SchemaScraper, url: str) -> Recipe:
def try_get_default(func_call: Callable, get_attr: str, default: Any, clean_func=None):
value = default
try:
value = func_call()
except Exception:
logger.error(f"Error parsing recipe func_call for '{get_attr}'")
if value == default:
try:
value = scraped_data.schema.data.get(get_attr)
except Exception:
logger.error(f"Error parsing recipe attribute '{get_attr}'")
if clean_func:
value = clean_func(value)
return value
def get_instructions() -> list[dict]:
instruction_as_text = try_get_default(
scraped_data.instructions, "recipeInstructions", ["No Instructions Found"]
)
logger.info(f"Scraped Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
instruction_as_text = cleaner.instructions(instruction_as_text)
logger.info(f"Cleaned Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
try:
return [RecipeStep(title="", text=x.get("text")) for x in instruction_as_text]
except TypeError:
return []
cook_time = try_get_default(None, "performTime", None, cleaner.clean_time) or try_get_default(
None, "cookTime", None, cleaner.clean_time
)
return Recipe(
name=try_get_default(scraped_data.title, "name", "No Name Found", cleaner.clean_string),
slug="",
image=try_get_default(None, "image", None),
description=try_get_default(None, "description", "", cleaner.clean_string),
nutrition=try_get_default(None, "nutrition", None, cleaner.clean_nutrition),
recipe_yield=try_get_default(scraped_data.yields, "recipeYield", "1", cleaner.clean_string),
recipe_ingredient=try_get_default(scraped_data.ingredients, "recipeIngredient", [""], cleaner.ingredient),
recipe_instructions=get_instructions(),
total_time=try_get_default(None, "totalTime", None, cleaner.clean_time),
prep_time=try_get_default(None, "prepTime", None, cleaner.clean_time),
perform_time=cook_time,
org_url=url,
)
def download_image_for_recipe(slug, image_url) -> dict:
def download_image_for_recipe(slug, image_url) -> str | None:
img_name = None
try:
img_path = scrape_image(image_url, slug)

View File

@ -0,0 +1,197 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, Callable, Tuple
import extruct
import requests
from fastapi import HTTPException, status
from recipe_scrapers import NoSchemaFoundInWildMode, SchemaScraperFactory, WebsiteNotImplementedError, scrape_me
from slugify import slugify
from w3lib.html import get_base_url
from mealie.core.root_logger import get_logger
from mealie.schema.recipe.recipe import Recipe, RecipeStep
from . import cleaner
class ABCScraperStrategy(ABC):
"""
Abstract class for all recipe parsers.
"""
url: str
def __init__(self, url: str) -> None:
self.logger = get_logger()
self.url = url
@abstractmethod
def parse(self, recipe_url: str) -> Recipe | None:
"""Parse a recipe from a web URL.
Args:
recipe_url (str): Full URL of the recipe to scrape.
Returns:
Recipe: Recipe object.
"""
...
class RecipeScraperPackage(ABCScraperStrategy):
"""
Abstract class for all recipe parsers.
"""
def clean_scraper(self, scraped_data: SchemaScraperFactory.SchemaScraper, url: str) -> Recipe:
def try_get_default(func_call: Callable, get_attr: str, default: Any, clean_func=None):
value = default
try:
value = func_call()
except Exception:
self.logger.error(f"Error parsing recipe func_call for '{get_attr}'")
if value == default:
try:
value = scraped_data.schema.data.get(get_attr)
except Exception:
self.logger.error(f"Error parsing recipe attribute '{get_attr}'")
if clean_func:
value = clean_func(value)
return value
def get_instructions() -> list[dict]:
instruction_as_text = try_get_default(
scraped_data.instructions, "recipeInstructions", ["No Instructions Found"]
)
self.logger.debug(f"Scraped Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
instruction_as_text = cleaner.instructions(instruction_as_text)
self.logger.debug(f"Cleaned Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
try:
return [RecipeStep(title="", text=x.get("text")) for x in instruction_as_text]
except TypeError:
return []
cook_time = try_get_default(None, "performTime", None, cleaner.clean_time) or try_get_default(
None, "cookTime", None, cleaner.clean_time
)
return Recipe(
name=try_get_default(scraped_data.title, "name", "No Name Found", cleaner.clean_string),
slug="",
image=try_get_default(None, "image", None),
description=try_get_default(None, "description", "", cleaner.clean_string),
nutrition=try_get_default(None, "nutrition", None, cleaner.clean_nutrition),
recipe_yield=try_get_default(scraped_data.yields, "recipeYield", "1", cleaner.clean_string),
recipe_ingredient=try_get_default(scraped_data.ingredients, "recipeIngredient", [""], cleaner.ingredient),
recipe_instructions=get_instructions(),
total_time=try_get_default(None, "totalTime", None, cleaner.clean_time),
prep_time=try_get_default(None, "prepTime", None, cleaner.clean_time),
perform_time=cook_time,
org_url=url,
)
def scrape_url(self) -> SchemaScraperFactory.SchemaScraper | Any | None:
try:
scraped_schema = scrape_me(self.url)
except (WebsiteNotImplementedError, AttributeError):
try:
scraped_schema = scrape_me(self.url, wild_mode=True)
except (NoSchemaFoundInWildMode, AttributeError):
self.logger.error("Recipe Scraper was unable to extract a recipe.")
return None
except ConnectionError:
raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": "CONNECTION_ERROR"})
# Check to see if the recipe is valid
try:
ingredients = scraped_schema.ingredients()
except Exception:
ingredients = []
try:
instruct = scraped_schema.instructions()
except Exception:
instruct = []
if instruct or ingredients:
return scraped_schema
self.logger.debug(f"Recipe Scraper [Package] was unable to extract a recipe from {self.url}")
return None
def parse(self) -> Recipe | None:
"""
Parse a recipe from a given url.
"""
scraped_data = self.scrape_url()
if scraped_data is None:
return None
return self.clean_scraper(scraped_data, self.url)
class RecipeScraperOpenGraph(ABCScraperStrategy):
"""
Abstract class for all recipe parsers.
"""
def get_html(self) -> str:
return requests.get(self.url).text
def get_recipe_fields(self, html) -> dict:
"""
Get the recipe fields from the Open Graph data.
"""
def og_field(properties: dict, field_name: str) -> str:
return next((val for name, val in properties if name == field_name), None)
def og_fields(properties: list[Tuple[str, str]], field_name: str) -> list[str]:
return list({val for name, val in properties if name == field_name})
base_url = get_base_url(html, self.url)
data = extruct.extract(html, base_url=base_url, errors="log")
try:
properties = data["opengraph"][0]["properties"]
except Exception:
return
return {
"name": og_field(properties, "og:title"),
"description": og_field(properties, "og:description"),
"image": og_field(properties, "og:image"),
"recipeYield": "",
"recipeIngredient": ["Could not detect ingredients"],
"recipeInstructions": [{"text": "Could not detect instructions"}],
"slug": slugify(og_field(properties, "og:title")),
"orgURL": og_field(properties, "og:url"),
"categories": [],
"tags": og_fields(properties, "og:article:tag"),
"dateAdded": None,
"notes": [],
"extras": [],
}
def parse(self) -> Recipe | None:
"""
Parse a recipe from a given url.
"""
html = self.get_html()
og_data = self.get_recipe_fields(html)
if og_data is None:
return None
return Recipe(**og_data)