diff options
-rw-r--r-- | requirements.txt | 27 | ||||
-rwxr-xr-x | serve.py | 47 | ||||
-rw-r--r-- | tagrss.py | 83 |
3 files changed, 81 insertions, 76 deletions
diff --git a/requirements.txt b/requirements.txt index 268609c..87749db 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,26 @@ +annotated-types==0.5.0 +autocommand==2.2.2 bottle==0.12.25 -certifi==2023.5.7 -charset-normalizer==3.1.0 +certifi==2023.7.22 +charset-normalizer==3.2.0 +cheroot==10.0.0 +CherryPy==18.8.0 feedparser==6.0.10 -gevent==22.10.2 -greenlet==2.0.2 idna==3.4 +inflect==7.0.0 +jaraco.collections==4.3.0 +jaraco.context==4.3.0 +jaraco.functools==3.8.0 +jaraco.text==3.11.1 +more-itertools==10.0.0 +portend==3.2.0 +pydantic==2.1.1 +pydantic_core==2.4.0 +pytz==2023.3 requests==2.31.0 schedule==1.2.0 sgmllib3k==1.0.0 -urllib3==2.0.3 -zope.event==4.6 -zope.interface==6.0 +tempora==5.5.0 +typing_extensions==4.7.1 +urllib3==2.0.4 +zc.lockfile==3.0.post1 @@ -1,14 +1,11 @@ #!/usr/bin/env python3 -import gevent.monkey - -gevent.monkey.patch_all() import bottle -import gevent.lock +import schedule import argparse -import pathlib +import logging import math -import schedule +import pathlib import threading import time import typing @@ -18,6 +15,8 @@ import tagrss MAX_PER_PAGE_ENTRIES = 1000 DEFAULT_PER_PAGE_ENTRIES = 50 +logging.basicConfig(level=logging.INFO) + parser = argparse.ArgumentParser() parser.add_argument("--host", default="localhost") parser.add_argument("--port", default=8000, type=int) @@ -27,7 +26,7 @@ args = parser.parse_args() storage_path: pathlib.Path = pathlib.Path(args.storage_path) -core_lock = gevent.lock.RLock() +core_lock = threading.RLock() core = tagrss.TagRss(storage_path=storage_path) @@ -113,9 +112,11 @@ def add_feed_effect(): tags = parse_space_separated_tags(bottle.request.forms.get("tags")) # type: ignore already_present: bool = False + + parsed, epoch_downloaded = tagrss.fetch_parsed_feed(feed_source) with core_lock: try: - core.add_feed(feed_source=feed_source, tags=tags) + core.add_feed(feed_source=feed_source, parsed_feed=parsed, epoch_downloaded=epoch_downloaded, tags=tags) except tagrss.FeedAlreadyAddedError: already_present = True # TODO: handle FeedFetchError too @@ -145,7 +146,7 @@ def manage_feed_view(): @bottle.post("/manage_feed") -def manage_feed_effect_update(): +def manage_feed_effect(): feed: dict[str, typing.Any] = {} feed["id"] = int(bottle.request.forms["id"]) # type: ignore feed["source"] = bottle.request.forms["source"] # type: ignore @@ -159,17 +160,12 @@ def manage_feed_effect_update(): return bottle.template("manage_feed", feed=feed, after_update=True) -@bottle.get("/delete_feed") -def delete_feed_view(): - return bottle.static_file("delete_feed.html", root="views") - - @bottle.post("/delete_feed") -def delete_feed_effect(): +def delete_feed(): feed_id: int = int(bottle.request.forms["id"]) # type: ignore with core_lock: core.delete_feed(feed_id) - return bottle.redirect("/delete_feed") + return bottle.static_file("delete_feed.html", root="views") @bottle.get("/static/<path:path>") @@ -179,23 +175,30 @@ def serve_static(path): def update_feeds(run_event: threading.Event): def inner_update(): + logging.info("Updating feeds...") + limit = 100 with core_lock: - feeds = core.get_all_feed_ids() - for feed_id in feeds(): - core.fetch_new_feed_entries(feed_id) - + feed_count = core.get_feed_count() + for i in range(math.ceil(feed_count / limit)): + with core_lock: + feeds = core.get_feeds(limit=limit, offset=limit * i) + for feed in feeds: + parsed_feed, epoch_downloaded = tagrss.fetch_parsed_feed(feed["source"]) + logging.debug(f"Updated feed with source {feed['source']} .") + with core_lock: + core.store_feed_entries(feed["id"], parsed_feed, epoch_downloaded) + logging.info("Finished updating feeds.") inner_update() schedule.every(args.update_seconds).seconds.do(inner_update) while run_event.is_set(): schedule.run_pending() time.sleep(1) - feed_update_run_event = threading.Event() feed_update_run_event.set() threading.Thread(target=update_feeds, args=(feed_update_run_event,)).start() -bottle.run(host=args.host, port=args.port, server="gevent") +bottle.run(host=args.host, port=args.port, server="cheroot") feed_update_run_event.clear() with core_lock: core.close() @@ -22,9 +22,34 @@ class SqliteMissingForeignKeySupportError(Exception): pass +class Sqlite3NotSerializedModeError(Exception): + pass + + +def fetch_parsed_feed(feed_source: str) -> tuple[feedparser.FeedParserDict, int]: + response = requests.get(feed_source) + epoch_downloaded: int = int(time.time()) + if response.status_code != requests.codes.ok: + raise FeedFetchError(feed_source, response.status_code) + try: + base: str = response.headers["Content-Location"] + except KeyError: + base: str = feed_source + parsed = feedparser.parse( + io.BytesIO(bytes(response.text, encoding="utf-8")), + response_headers={"Content-Location": base}, + ) + return (parsed, epoch_downloaded) + + class TagRss: def __init__(self, *, storage_path: str | pathlib.Path): - self.connection: sqlite3.Connection = sqlite3.connect(storage_path) + if sqlite3.threadsafety != 3: + raise Sqlite3NotSerializedModeError + + self.connection: sqlite3.Connection = sqlite3.connect( + storage_path, check_same_thread=False + ) with self.connection: with open("setup.sql", "r") as setup_script: @@ -32,21 +57,16 @@ class TagRss: if (1,) not in self.connection.execute("PRAGMA foreign_keys;").fetchmany(1): raise SqliteMissingForeignKeySupportError - def add_feed(self, feed_source: str, tags: list[str]) -> None: - response = requests.get(feed_source) - epoch_downloaded: int = int(time.time()) - if response.status_code != requests.codes.ok: - raise FeedFetchError(feed_source, response.status_code) - try: - base: str = response.headers["Content-Location"] - except KeyError: - base: str = feed_source - parsed = feedparser.parse( - io.BytesIO(bytes(response.text, encoding="utf-8")), - response_headers={"Content-Location": base}, - ) + def add_feed( + self, + *, + feed_source: str, + parsed_feed: feedparser.FeedParserDict, + epoch_downloaded: int, + tags: list[str], + ) -> None: with self.connection: - feed_title: str = parsed.feed.get("title", "") + feed_title: str = parsed_feed.feed.get("title", "") # type: ignore try: self.connection.execute( "INSERT INTO feeds(source, title) VALUES(?, ?);", @@ -61,7 +81,7 @@ class TagRss: "INSERT INTO feed_tags(feed_id, tag) VALUES(?, ?);", ((feed_id, tag) for tag in tags), ) - self.store_feed_entries(feed_id, parsed, epoch_downloaded) + self.store_feed_entries(feed_id, parsed_feed, epoch_downloaded) def get_entries( self, *, limit: int, offset: int = 0 @@ -152,18 +172,6 @@ class TagRss: ) return feeds - def get_all_feed_ids(self): - def inner(): - with self.connection: - resp = self.connection.execute("SELECT id FROM feeds;") - while True: - row = resp.fetchone() - if not row: - break - yield row[0] - - return inner - def get_entry_count(self) -> int: with self.connection: return self.connection.execute("SELECT count from entry_count;").fetchone()[ @@ -176,25 +184,6 @@ class TagRss: 0 ] - def fetch_new_feed_entries(self, feed_id: int) -> None: - with self.connection: - feed_source: str = self.connection.execute( - "SELECT source FROM feeds WHERE id = ?;", (feed_id,) - ).fetchone()[0] - response = requests.get(feed_source) - epoch_downloaded: int = int(time.time()) - if response.status_code != requests.codes.ok: - raise FeedFetchError(feed_source, response.status_code) - try: - base: str = response.headers["Content-Location"] - except KeyError: - base: str = feed_source - parsed = feedparser.parse( - io.BytesIO(bytes(response.text, encoding="utf-8")), - response_headers={"Content-Location": base}, - ) - self.store_feed_entries(feed_id, parsed, epoch_downloaded) - def store_feed_entries(self, feed_id: int, parsed_feed, epoch_downloaded: int): for entry in reversed(parsed_feed.entries): link: str = entry.get("link", None) |