From d14c45978119ba150ffff8abf6c87634ecb46c52 Mon Sep 17 00:00:00 2001 From: Arjun Satarkar Date: Fri, 28 Jul 2023 20:55:47 +0530 Subject: Stop feed fetching from blocking everything We accomplished this by switching from gevent to cherrypy (should be possible with gevent, but it wasn't worth the trouble) and shifting the HTTP request part of the process out of the core TagRss class. Also added a bit of logging, got rid of /delete_feed returning a spurious success page in response to GET requests, --- requirements.txt | 27 +++++++++++++----- serve.py | 47 +++++++++++++++++--------------- tagrss.py | 83 ++++++++++++++++++++++++-------------------------------- 3 files changed, 81 insertions(+), 76 deletions(-) diff --git a/requirements.txt b/requirements.txt index 268609c..87749db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,26 @@ +annotated-types==0.5.0 +autocommand==2.2.2 bottle==0.12.25 -certifi==2023.5.7 -charset-normalizer==3.1.0 +certifi==2023.7.22 +charset-normalizer==3.2.0 +cheroot==10.0.0 +CherryPy==18.8.0 feedparser==6.0.10 -gevent==22.10.2 -greenlet==2.0.2 idna==3.4 +inflect==7.0.0 +jaraco.collections==4.3.0 +jaraco.context==4.3.0 +jaraco.functools==3.8.0 +jaraco.text==3.11.1 +more-itertools==10.0.0 +portend==3.2.0 +pydantic==2.1.1 +pydantic_core==2.4.0 +pytz==2023.3 requests==2.31.0 schedule==1.2.0 sgmllib3k==1.0.0 -urllib3==2.0.3 -zope.event==4.6 -zope.interface==6.0 +tempora==5.5.0 +typing_extensions==4.7.1 +urllib3==2.0.4 +zc.lockfile==3.0.post1 diff --git a/serve.py b/serve.py index 41a4757..89b8a74 100755 --- a/serve.py +++ b/serve.py @@ -1,14 +1,11 @@ #!/usr/bin/env python3 -import gevent.monkey - -gevent.monkey.patch_all() import bottle -import gevent.lock +import schedule import argparse -import pathlib +import logging import math -import schedule +import pathlib import threading import time import typing @@ -18,6 +15,8 @@ import tagrss MAX_PER_PAGE_ENTRIES = 1000 DEFAULT_PER_PAGE_ENTRIES = 50 +logging.basicConfig(level=logging.INFO) + parser = argparse.ArgumentParser() parser.add_argument("--host", default="localhost") parser.add_argument("--port", default=8000, type=int) @@ -27,7 +26,7 @@ args = parser.parse_args() storage_path: pathlib.Path = pathlib.Path(args.storage_path) -core_lock = gevent.lock.RLock() +core_lock = threading.RLock() core = tagrss.TagRss(storage_path=storage_path) @@ -113,9 +112,11 @@ def add_feed_effect(): tags = parse_space_separated_tags(bottle.request.forms.get("tags")) # type: ignore already_present: bool = False + + parsed, epoch_downloaded = tagrss.fetch_parsed_feed(feed_source) with core_lock: try: - core.add_feed(feed_source=feed_source, tags=tags) + core.add_feed(feed_source=feed_source, parsed_feed=parsed, epoch_downloaded=epoch_downloaded, tags=tags) except tagrss.FeedAlreadyAddedError: already_present = True # TODO: handle FeedFetchError too @@ -145,7 +146,7 @@ def manage_feed_view(): @bottle.post("/manage_feed") -def manage_feed_effect_update(): +def manage_feed_effect(): feed: dict[str, typing.Any] = {} feed["id"] = int(bottle.request.forms["id"]) # type: ignore feed["source"] = bottle.request.forms["source"] # type: ignore @@ -159,17 +160,12 @@ def manage_feed_effect_update(): return bottle.template("manage_feed", feed=feed, after_update=True) -@bottle.get("/delete_feed") -def delete_feed_view(): - return bottle.static_file("delete_feed.html", root="views") - - @bottle.post("/delete_feed") -def delete_feed_effect(): +def delete_feed(): feed_id: int = int(bottle.request.forms["id"]) # type: ignore with core_lock: core.delete_feed(feed_id) - return bottle.redirect("/delete_feed") + return bottle.static_file("delete_feed.html", root="views") @bottle.get("/static/") @@ -179,23 +175,30 @@ def serve_static(path): def update_feeds(run_event: threading.Event): def inner_update(): + logging.info("Updating feeds...") + limit = 100 with core_lock: - feeds = core.get_all_feed_ids() - for feed_id in feeds(): - core.fetch_new_feed_entries(feed_id) - + feed_count = core.get_feed_count() + for i in range(math.ceil(feed_count / limit)): + with core_lock: + feeds = core.get_feeds(limit=limit, offset=limit * i) + for feed in feeds: + parsed_feed, epoch_downloaded = tagrss.fetch_parsed_feed(feed["source"]) + logging.debug(f"Updated feed with source {feed['source']} .") + with core_lock: + core.store_feed_entries(feed["id"], parsed_feed, epoch_downloaded) + logging.info("Finished updating feeds.") inner_update() schedule.every(args.update_seconds).seconds.do(inner_update) while run_event.is_set(): schedule.run_pending() time.sleep(1) - feed_update_run_event = threading.Event() feed_update_run_event.set() threading.Thread(target=update_feeds, args=(feed_update_run_event,)).start() -bottle.run(host=args.host, port=args.port, server="gevent") +bottle.run(host=args.host, port=args.port, server="cheroot") feed_update_run_event.clear() with core_lock: core.close() diff --git a/tagrss.py b/tagrss.py index 038c940..8ae001c 100644 --- a/tagrss.py +++ b/tagrss.py @@ -22,9 +22,34 @@ class SqliteMissingForeignKeySupportError(Exception): pass +class Sqlite3NotSerializedModeError(Exception): + pass + + +def fetch_parsed_feed(feed_source: str) -> tuple[feedparser.FeedParserDict, int]: + response = requests.get(feed_source) + epoch_downloaded: int = int(time.time()) + if response.status_code != requests.codes.ok: + raise FeedFetchError(feed_source, response.status_code) + try: + base: str = response.headers["Content-Location"] + except KeyError: + base: str = feed_source + parsed = feedparser.parse( + io.BytesIO(bytes(response.text, encoding="utf-8")), + response_headers={"Content-Location": base}, + ) + return (parsed, epoch_downloaded) + + class TagRss: def __init__(self, *, storage_path: str | pathlib.Path): - self.connection: sqlite3.Connection = sqlite3.connect(storage_path) + if sqlite3.threadsafety != 3: + raise Sqlite3NotSerializedModeError + + self.connection: sqlite3.Connection = sqlite3.connect( + storage_path, check_same_thread=False + ) with self.connection: with open("setup.sql", "r") as setup_script: @@ -32,21 +57,16 @@ class TagRss: if (1,) not in self.connection.execute("PRAGMA foreign_keys;").fetchmany(1): raise SqliteMissingForeignKeySupportError - def add_feed(self, feed_source: str, tags: list[str]) -> None: - response = requests.get(feed_source) - epoch_downloaded: int = int(time.time()) - if response.status_code != requests.codes.ok: - raise FeedFetchError(feed_source, response.status_code) - try: - base: str = response.headers["Content-Location"] - except KeyError: - base: str = feed_source - parsed = feedparser.parse( - io.BytesIO(bytes(response.text, encoding="utf-8")), - response_headers={"Content-Location": base}, - ) + def add_feed( + self, + *, + feed_source: str, + parsed_feed: feedparser.FeedParserDict, + epoch_downloaded: int, + tags: list[str], + ) -> None: with self.connection: - feed_title: str = parsed.feed.get("title", "") + feed_title: str = parsed_feed.feed.get("title", "") # type: ignore try: self.connection.execute( "INSERT INTO feeds(source, title) VALUES(?, ?);", @@ -61,7 +81,7 @@ class TagRss: "INSERT INTO feed_tags(feed_id, tag) VALUES(?, ?);", ((feed_id, tag) for tag in tags), ) - self.store_feed_entries(feed_id, parsed, epoch_downloaded) + self.store_feed_entries(feed_id, parsed_feed, epoch_downloaded) def get_entries( self, *, limit: int, offset: int = 0 @@ -152,18 +172,6 @@ class TagRss: ) return feeds - def get_all_feed_ids(self): - def inner(): - with self.connection: - resp = self.connection.execute("SELECT id FROM feeds;") - while True: - row = resp.fetchone() - if not row: - break - yield row[0] - - return inner - def get_entry_count(self) -> int: with self.connection: return self.connection.execute("SELECT count from entry_count;").fetchone()[ @@ -176,25 +184,6 @@ class TagRss: 0 ] - def fetch_new_feed_entries(self, feed_id: int) -> None: - with self.connection: - feed_source: str = self.connection.execute( - "SELECT source FROM feeds WHERE id = ?;", (feed_id,) - ).fetchone()[0] - response = requests.get(feed_source) - epoch_downloaded: int = int(time.time()) - if response.status_code != requests.codes.ok: - raise FeedFetchError(feed_source, response.status_code) - try: - base: str = response.headers["Content-Location"] - except KeyError: - base: str = feed_source - parsed = feedparser.parse( - io.BytesIO(bytes(response.text, encoding="utf-8")), - response_headers={"Content-Location": base}, - ) - self.store_feed_entries(feed_id, parsed, epoch_downloaded) - def store_feed_entries(self, feed_id: int, parsed_feed, epoch_downloaded: int): for entry in reversed(parsed_feed.entries): link: str = entry.get("link", None) -- cgit v1.2.3-57-g22cb