diff options
author | Arjun Satarkar <me@arjunsatarkar.net> | 2023-07-28 15:25:47 +0000 |
---|---|---|
committer | Arjun Satarkar <me@arjunsatarkar.net> | 2023-07-28 15:25:47 +0000 |
commit | d14c45978119ba150ffff8abf6c87634ecb46c52 (patch) | |
tree | 2c816d3da0f83569a61533960c9059ee9e84e8da | |
parent | 15f0a2d2577949cd250aea54561b29634943468f (diff) | |
download | tagrss-d14c45978119ba150ffff8abf6c87634ecb46c52.tar tagrss-d14c45978119ba150ffff8abf6c87634ecb46c52.tar.gz tagrss-d14c45978119ba150ffff8abf6c87634ecb46c52.zip |
Stop feed fetching from blocking everything
We accomplished this by switching from gevent to cherrypy (should be possible
with gevent, but it wasn't worth the trouble) and shifting the HTTP request
part of the process out of the core TagRss class.
Also added a bit of logging, got rid of /delete_feed returning a spurious
success page in response to GET requests,
-rw-r--r-- | requirements.txt | 27 | ||||
-rwxr-xr-x | serve.py | 47 | ||||
-rw-r--r-- | tagrss.py | 83 |
3 files changed, 81 insertions, 76 deletions
diff --git a/requirements.txt b/requirements.txt index 268609c..87749db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,26 @@ +annotated-types==0.5.0 +autocommand==2.2.2 bottle==0.12.25 -certifi==2023.5.7 -charset-normalizer==3.1.0 +certifi==2023.7.22 +charset-normalizer==3.2.0 +cheroot==10.0.0 +CherryPy==18.8.0 feedparser==6.0.10 -gevent==22.10.2 -greenlet==2.0.2 idna==3.4 +inflect==7.0.0 +jaraco.collections==4.3.0 +jaraco.context==4.3.0 +jaraco.functools==3.8.0 +jaraco.text==3.11.1 +more-itertools==10.0.0 +portend==3.2.0 +pydantic==2.1.1 +pydantic_core==2.4.0 +pytz==2023.3 requests==2.31.0 schedule==1.2.0 sgmllib3k==1.0.0 -urllib3==2.0.3 -zope.event==4.6 -zope.interface==6.0 +tempora==5.5.0 +typing_extensions==4.7.1 +urllib3==2.0.4 +zc.lockfile==3.0.post1 @@ -1,14 +1,11 @@ #!/usr/bin/env python3 -import gevent.monkey - -gevent.monkey.patch_all() import bottle -import gevent.lock +import schedule import argparse -import pathlib +import logging import math -import schedule +import pathlib import threading import time import typing @@ -18,6 +15,8 @@ import tagrss MAX_PER_PAGE_ENTRIES = 1000 DEFAULT_PER_PAGE_ENTRIES = 50 +logging.basicConfig(level=logging.INFO) + parser = argparse.ArgumentParser() parser.add_argument("--host", default="localhost") parser.add_argument("--port", default=8000, type=int) @@ -27,7 +26,7 @@ args = parser.parse_args() storage_path: pathlib.Path = pathlib.Path(args.storage_path) -core_lock = gevent.lock.RLock() +core_lock = threading.RLock() core = tagrss.TagRss(storage_path=storage_path) @@ -113,9 +112,11 @@ def add_feed_effect(): tags = parse_space_separated_tags(bottle.request.forms.get("tags")) # type: ignore already_present: bool = False + + parsed, epoch_downloaded = tagrss.fetch_parsed_feed(feed_source) with core_lock: try: - core.add_feed(feed_source=feed_source, tags=tags) + core.add_feed(feed_source=feed_source, parsed_feed=parsed, epoch_downloaded=epoch_downloaded, tags=tags) except tagrss.FeedAlreadyAddedError: already_present = True # TODO: handle FeedFetchError too @@ -145,7 +146,7 @@ def manage_feed_view(): @bottle.post("/manage_feed") -def manage_feed_effect_update(): +def manage_feed_effect(): feed: dict[str, typing.Any] = {} feed["id"] = int(bottle.request.forms["id"]) # type: ignore feed["source"] = bottle.request.forms["source"] # type: ignore @@ -159,17 +160,12 @@ def manage_feed_effect_update(): return bottle.template("manage_feed", feed=feed, after_update=True) -@bottle.get("/delete_feed") -def delete_feed_view(): - return bottle.static_file("delete_feed.html", root="views") - - @bottle.post("/delete_feed") -def delete_feed_effect(): +def delete_feed(): feed_id: int = int(bottle.request.forms["id"]) # type: ignore with core_lock: core.delete_feed(feed_id) - return bottle.redirect("/delete_feed") + return bottle.static_file("delete_feed.html", root="views") @bottle.get("/static/<path:path>") @@ -179,23 +175,30 @@ def serve_static(path): def update_feeds(run_event: threading.Event): def inner_update(): + logging.info("Updating feeds...") + limit = 100 with core_lock: - feeds = core.get_all_feed_ids() - for feed_id in feeds(): - core.fetch_new_feed_entries(feed_id) - + feed_count = core.get_feed_count() + for i in range(math.ceil(feed_count / limit)): + with core_lock: + feeds = core.get_feeds(limit=limit, offset=limit * i) + for feed in feeds: + parsed_feed, epoch_downloaded = tagrss.fetch_parsed_feed(feed["source"]) + logging.debug(f"Updated feed with source {feed['source']} .") + with core_lock: + core.store_feed_entries(feed["id"], parsed_feed, epoch_downloaded) + logging.info("Finished updating feeds.") inner_update() schedule.every(args.update_seconds).seconds.do(inner_update) while run_event.is_set(): schedule.run_pending() time.sleep(1) - feed_update_run_event = threading.Event() feed_update_run_event.set() threading.Thread(target=update_feeds, args=(feed_update_run_event,)).start() -bottle.run(host=args.host, port=args.port, server="gevent") +bottle.run(host=args.host, port=args.port, server="cheroot") feed_update_run_event.clear() with core_lock: core.close() @@ -22,9 +22,34 @@ class SqliteMissingForeignKeySupportError(Exception): pass +class Sqlite3NotSerializedModeError(Exception): + pass + + +def fetch_parsed_feed(feed_source: str) -> tuple[feedparser.FeedParserDict, int]: + response = requests.get(feed_source) + epoch_downloaded: int = int(time.time()) + if response.status_code != requests.codes.ok: + raise FeedFetchError(feed_source, response.status_code) + try: + base: str = response.headers["Content-Location"] + except KeyError: + base: str = feed_source + parsed = feedparser.parse( + io.BytesIO(bytes(response.text, encoding="utf-8")), + response_headers={"Content-Location": base}, + ) + return (parsed, epoch_downloaded) + + class TagRss: def __init__(self, *, storage_path: str | pathlib.Path): - self.connection: sqlite3.Connection = sqlite3.connect(storage_path) + if sqlite3.threadsafety != 3: + raise Sqlite3NotSerializedModeError + + self.connection: sqlite3.Connection = sqlite3.connect( + storage_path, check_same_thread=False + ) with self.connection: with open("setup.sql", "r") as setup_script: @@ -32,21 +57,16 @@ class TagRss: if (1,) not in self.connection.execute("PRAGMA foreign_keys;").fetchmany(1): raise SqliteMissingForeignKeySupportError - def add_feed(self, feed_source: str, tags: list[str]) -> None: - response = requests.get(feed_source) - epoch_downloaded: int = int(time.time()) - if response.status_code != requests.codes.ok: - raise FeedFetchError(feed_source, response.status_code) - try: - base: str = response.headers["Content-Location"] - except KeyError: - base: str = feed_source - parsed = feedparser.parse( - io.BytesIO(bytes(response.text, encoding="utf-8")), - response_headers={"Content-Location": base}, - ) + def add_feed( + self, + *, + feed_source: str, + parsed_feed: feedparser.FeedParserDict, + epoch_downloaded: int, + tags: list[str], + ) -> None: with self.connection: - feed_title: str = parsed.feed.get("title", "") + feed_title: str = parsed_feed.feed.get("title", "") # type: ignore try: self.connection.execute( "INSERT INTO feeds(source, title) VALUES(?, ?);", @@ -61,7 +81,7 @@ class TagRss: "INSERT INTO feed_tags(feed_id, tag) VALUES(?, ?);", ((feed_id, tag) for tag in tags), ) - self.store_feed_entries(feed_id, parsed, epoch_downloaded) + self.store_feed_entries(feed_id, parsed_feed, epoch_downloaded) def get_entries( self, *, limit: int, offset: int = 0 @@ -152,18 +172,6 @@ class TagRss: ) return feeds - def get_all_feed_ids(self): - def inner(): - with self.connection: - resp = self.connection.execute("SELECT id FROM feeds;") - while True: - row = resp.fetchone() - if not row: - break - yield row[0] - - return inner - def get_entry_count(self) -> int: with self.connection: return self.connection.execute("SELECT count from entry_count;").fetchone()[ @@ -176,25 +184,6 @@ class TagRss: 0 ] - def fetch_new_feed_entries(self, feed_id: int) -> None: - with self.connection: - feed_source: str = self.connection.execute( - "SELECT source FROM feeds WHERE id = ?;", (feed_id,) - ).fetchone()[0] - response = requests.get(feed_source) - epoch_downloaded: int = int(time.time()) - if response.status_code != requests.codes.ok: - raise FeedFetchError(feed_source, response.status_code) - try: - base: str = response.headers["Content-Location"] - except KeyError: - base: str = feed_source - parsed = feedparser.parse( - io.BytesIO(bytes(response.text, encoding="utf-8")), - response_headers={"Content-Location": base}, - ) - self.store_feed_entries(feed_id, parsed, epoch_downloaded) - def store_feed_entries(self, feed_id: int, parsed_feed, epoch_downloaded: int): for entry in reversed(parsed_feed.entries): link: str = entry.get("link", None) |