From f064e8ffefc841b6dfffdb17f3f407e7558ef8b9 Mon Sep 17 00:00:00 2001 From: Arjun Satarkar Date: Thu, 3 Aug 2023 04:33:49 +0530 Subject: Refactor; more to come I need to add documentation at some point too. --- serve.py | 164 +++++++------- setup.sql | 12 +- tagrss.py | 592 ++++++++++++++++++++++++++++++++------------------- views/index.tpl | 16 +- views/list_feeds.tpl | 10 +- 5 files changed, 483 insertions(+), 311 deletions(-) diff --git a/serve.py b/serve.py index f456f80..87a30bb 100755 --- a/serve.py +++ b/serve.py @@ -36,7 +36,6 @@ args = parser.parse_args() storage_path: pathlib.Path = pathlib.Path(args.storage_path) -core_lock = threading.RLock() core = tagrss.TagRss(storage_path=storage_path) @@ -87,32 +86,30 @@ def index(): included_tags: typing.Optional[list[str]] = None if included_tags_str: included_tags = parse_space_separated_tags(included_tags_str) - with core_lock: - total_pages: int = max( - 1, - math.ceil( - core.get_entry_count( - included_feeds=included_feeds, included_tags=included_tags - ) - / per_page - ), - ) - entries = core.get_entries( - limit=per_page, - offset=offset, - included_feeds=included_feeds, - included_tags=included_tags, - ) - referenced_feed_ids = list({entry["feed_id"] for entry in entries}) - with core_lock: - referenced_feeds_list = core.get_feeds( - limit=len(referenced_feed_ids), - included_feeds=referenced_feed_ids, - get_tags=True, - ) + total_pages: int = max( + 1, + math.ceil( + core.get_entry_count( + included_feeds=included_feeds, included_tags=included_tags + ) + / per_page + ), + ) + entries = core.get_entries( + limit=per_page, + offset=offset, + included_feeds=included_feeds, + included_tags=included_tags, + ) + referenced_feed_ids = list({entry.feed_id for entry in entries}) + referenced_feeds_list = core.get_feeds( + limit=len(referenced_feed_ids), + included_feeds=referenced_feed_ids, + get_tags=True, + ) referenced_feeds = {} for feed in referenced_feeds_list: - referenced_feeds[feed["id"]] = {k: feed[k] for k in feed if k != "id"} + referenced_feeds[feed.id] = feed return bottle.template( "index", entries=entries, @@ -125,7 +122,7 @@ def index(): included_tags=included_tags, included_feeds_str=included_feeds_str, included_tags_str=included_tags_str, - referenced_feeds=referenced_feeds + referenced_feeds=referenced_feeds, ) @@ -134,18 +131,17 @@ def list_feeds(): per_page: int = min(MAX_PER_PAGE_ENTRIES, int(bottle.request.query.get("per_page", DEFAULT_PER_PAGE_ENTRIES))) # type: ignore page_num = int(bottle.request.query.get("page_num", 1)) # type: ignore offset = (page_num - 1) * per_page - with core_lock: - total_pages: int = max(1, math.ceil(core.get_feed_count() / per_page)) - feeds = core.get_feeds(limit=per_page, offset=offset, get_tags=True) - return bottle.template( - "list_feeds", - feeds=feeds, - offset=offset, - page_num=page_num, - total_pages=total_pages, - per_page=per_page, - max_per_page=MAX_PER_PAGE_ENTRIES, - ) + total_pages: int = max(1, math.ceil(core.get_feed_count() / per_page)) + feeds = core.get_feeds(limit=per_page, offset=offset, get_tags=True) + return bottle.template( + "list_feeds", + feeds=feeds, + offset=offset, + page_num=page_num, + total_pages=total_pages, + per_page=per_page, + max_per_page=MAX_PER_PAGE_ENTRIES, + ) @bottle.get("/add_feed") @@ -163,19 +159,23 @@ def add_feed_effect(): already_present: bool = False - parsed, epoch_downloaded = tagrss.fetch_parsed_feed(feed_source) - with core_lock: - try: - feed_id = core.add_feed( - feed_source=feed_source, - parsed_feed=parsed, - epoch_downloaded=epoch_downloaded, - tags=tags, - ) - logging.info(f"Added feed {feed_id} (source: {feed_source}).") - except tagrss.FeedAlreadyAddedError: - already_present = True - # TODO: handle FeedFetchError too + try: + feed_id = core.add_feed( + source=feed_source, + tags=tags, + ) + logging.info(f"Added feed {feed_id} (source: {feed_source}).") + except tagrss.FeedSourceAlreadyExistsError: + already_present = True + except tagrss.FeedTitleAlreadyInUseError as e: + # TODO: add option to set title on /add_feed so this can be remedied without + # changing the existing feed + raise bottle.HTTPError( + 400, + f"Cannot add feed with title {str(e)} as another feed already has that " + "title.", + ) + # TODO: handle FeedFetchError too return bottle.template( "add_feed", after_add=True, @@ -193,10 +193,9 @@ def manage_feed_view(): raise bottle.HTTPError(400, "Feed ID not given.") feed: dict[str, typing.Any] = {} feed["id"] = feed_id - with core_lock: - feed["source"] = core.get_feed_source(feed_id) - feed["title"] = core.get_feed_title(feed_id) - feed["tags"] = core.get_feed_tags(feed_id) + feed["source"] = core.get_feed_source(feed_id) + feed["title"] = core.get_feed_title(feed_id) + feed["tags"] = core.get_feed_tags(feed_id) feed["serialised_tags"] = serialise_tags(feed["tags"]) return bottle.template("manage_feed", feed=feed) @@ -211,10 +210,23 @@ def manage_feed_effect(): feed["serialised_tags"] = bottle.request.forms["tags"] # type: ignore if len(feed["tags"]) > MAX_TAGS: raise bottle.HTTPError(400, f"A feed cannot have more than {MAX_TAGS} tags.") - with core_lock: + try: core.set_feed_source(feed["id"], feed["source"]) + except tagrss.FeedSourceAlreadyExistsError: + raise bottle.HTTPError( + 400, + f"Cannot change source to {feed['source']} as there is already a feed with" + " that source.", + ) + try: core.set_feed_title(feed["id"], feed["title"]) - core.set_feed_tags(feed["id"], feed["tags"]) + except tagrss.FeedTitleAlreadyInUseError: + raise bottle.HTTPError( + 400, + f"Cannot change title to {feed['title']} as there is already a feed with" + " that title.", + ) + core.set_feed_tags(feed["id"], feed["tags"]) logging.info(f"Edited details of feed {feed['id']}.") return bottle.template("manage_feed", feed=feed, after_update=True) @@ -222,8 +234,7 @@ def manage_feed_effect(): @bottle.post("/delete_feed") def delete_feed(): feed_id: int = int(bottle.request.forms["id"]) # type: ignore - with core_lock: - core.delete_feed(feed_id) + core.delete_feed(feed_id) logging.info(f"Deleted feed {feed_id}.") return bottle.template("delete_feed") @@ -237,24 +248,24 @@ def update_feeds(run_event: threading.Event): def inner_update(): logging.info("Updating all feeds...") limit = 100 - with core_lock: - feed_count = core.get_feed_count() + feed_count = core.get_feed_count() for i in range(math.ceil(feed_count / limit)): - with core_lock: - feeds = core.get_feeds(limit=limit, offset=limit * i) + feeds = core.get_feeds(limit=limit, offset=limit * i) for feed in feeds: - parsed_feed, epoch_downloaded = tagrss.fetch_parsed_feed(feed["source"]) - logging.debug(f"Fetched feed {feed['id']} (source {feed['source']}).") - with core_lock: - try: - core.store_feed_entries( - feed["id"], parsed_feed, epoch_downloaded - ) - except tagrss.StorageConstraintViolationError: - logging.warning( - f"Failed to update feed {feed['id']} with source {feed['source']} " - "due to constraint violation (feed already deleted?)." - ) + parsed, epoch_downloaded = core.fetch_and_parse_feed(feed.source) + try: + core.store_feed_entries( + feed_id=feed.id, # type: ignore + parsed=parsed, + epoch_downloaded=epoch_downloaded, + ) + except tagrss.StorageConstraintViolationError: + logging.warning( + f"Failed to update feed {feed.id} with source {feed.source} due" + "to constraint violation (feed already deleted?)." + ) + else: + logging.debug(f"Updated feed {feed.id} (source {feed.source}).") logging.info("Finished updating all feeds.") inner_update() @@ -271,5 +282,4 @@ threading.Thread(target=update_feeds, args=(feed_update_run_event,)).start() bottle.run(host=args.host, port=args.port, server="cheroot") logging.info("Exiting...") feed_update_run_event.clear() -with core_lock: - core.close() +core.close() diff --git a/setup.sql b/setup.sql index d4f01e6..b811b53 100644 --- a/setup.sql +++ b/setup.sql @@ -1,8 +1,8 @@ /* -Copyright (c) 2023-present Arjun Satarkar . -Licensed under the GNU Affero General Public License v3.0. See LICENSE.txt in -the root of this repository for the text of the license. -*/ + Copyright (c) 2023-present Arjun Satarkar . + Licensed under the GNU Affero General Public License v3.0. See LICENSE.txt in + the root of this repository for the text of the license. + */ PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS tagrss_info(info_key TEXT PRIMARY KEY, value TEXT) STRICT; @@ -10,7 +10,7 @@ CREATE TABLE IF NOT EXISTS tagrss_info(info_key TEXT PRIMARY KEY, value TEXT) ST INSERT OR REPLACE INTO tagrss_info(info_key, value) VALUES - ("version", "0.11.0"); + ("version", "0.12.0"); CREATE TABLE IF NOT EXISTS feed_count( id INTEGER PRIMARY KEY CHECK (id = 0), @@ -25,7 +25,7 @@ VALUES CREATE TABLE IF NOT EXISTS feeds( id INTEGER PRIMARY KEY AUTOINCREMENT, source TEXT UNIQUE, - title TEXT + title TEXT UNIQUE ) STRICT; CREATE TRIGGER IF NOT EXISTS trig_feeds__increment_feed_count_after_insert diff --git a/tagrss.py b/tagrss.py index 5d5ad01..f2a5da0 100644 --- a/tagrss.py +++ b/tagrss.py @@ -6,193 +6,151 @@ the root of this repository for the text of the license. import feedparser import requests +import abc import calendar +import contextlib +import dataclasses import io import pathlib import sqlite3 +import threading import time import typing -class FeedAlreadyAddedError(Exception): +class StorageError(Exception): pass -class FeedFetchError(Exception): +class FeedSourceAlreadyExistsError(StorageError): + pass + + +class FeedTitleAlreadyInUseError(StorageError): + pass + + +class StorageConstraintViolationError(StorageError): + def __init__(self, error): + super().__init__(error) + + +class SqliteMissingForeignKeySupportError(StorageError): + pass + + +class NetworkError(Exception): + pass + + +class FeedFetchError(NetworkError): def __init__(self, feed_source: str, status_code: int): super().__init__(f"Get {feed_source} returned HTTP {status_code}") -class SqliteMissingForeignKeySupportError(Exception): - pass +FeedId = int +Epoch = int +ParsedFeed = feedparser.FeedParserDict -class Sqlite3NotSerializedModeError(Exception): +class StorageProvider(abc.ABC): pass -class StorageConstraintViolationError(Exception): - def __init__(self, error): - super().__init__(error) +@dataclasses.dataclass(kw_only=True) +class PartialFeed: + id: typing.Optional[FeedId] = None + source: typing.Optional[str] = None + title: typing.Optional[str] = None + tags: typing.Optional[list[str]] = None -def fetch_parsed_feed(feed_source: str) -> tuple[feedparser.FeedParserDict, int]: - response = requests.get(feed_source) - epoch_downloaded: int = int(time.time()) - if response.status_code != requests.codes.ok: - raise FeedFetchError(feed_source, response.status_code) - try: - base: str = response.headers["Content-Location"] - except KeyError: - base: str = feed_source - parsed = feedparser.parse( - io.BytesIO(bytes(response.text, encoding="utf-8")), - response_headers={"Content-Location": base}, - ) - return (parsed, epoch_downloaded) +@dataclasses.dataclass(kw_only=True) +class Entry: + id: int + feed_id: FeedId + title: str + link: str + epoch_published: Epoch + epoch_updated: Epoch -class TagRss: - def __init__(self, *, storage_path: str | pathlib.Path): - if sqlite3.threadsafety != 3: - raise Sqlite3NotSerializedModeError +class SqliteStorageProvider(StorageProvider): + def __init__(self, storage_path: str | pathlib.Path): + self.__raw_connection = sqlite3.connect(storage_path, check_same_thread=False) + self.__raw_connection.isolation_level = None - self.connection: sqlite3.Connection = sqlite3.connect( - storage_path, check_same_thread=False - ) + self.__lock = threading.Lock() - with self.connection: + with self.__get_connection() as conn: with open("setup.sql", "r") as setup_script: - self.connection.executescript(setup_script.read()) - if (1,) not in self.connection.execute("PRAGMA foreign_keys;").fetchmany(1): + conn.executescript(setup_script.read()) + if (1,) not in conn.execute("PRAGMA foreign_keys;").fetchmany(1): raise SqliteMissingForeignKeySupportError - def add_feed( + @contextlib.contextmanager + def __get_connection(self, *, use_transaction: bool = True): + self.__lock.acquire() + try: + if use_transaction: + self.__raw_connection.execute("BEGIN;") + yield self.__raw_connection + except Exception as e: + if use_transaction: + self.__raw_connection.rollback() + raise e + else: + if use_transaction: + self.__raw_connection.commit() + finally: + self.__lock.release() + + def store_feed( self, *, - feed_source: str, - parsed_feed: feedparser.FeedParserDict, - epoch_downloaded: int, + source: str, + title: str, tags: list[str], - ) -> int: - feed_title: str = parsed_feed.feed.get("title", "") # type: ignore - with self.connection: + ) -> FeedId: + with self.__get_connection() as conn: try: - self.connection.execute( - "INSERT INTO feeds(source, title) VALUES(?, ?);", - (feed_source, feed_title), - ) # Note: ensure no more INSERTs between this and the last_insert_rowid() call + resp = conn.execute( + "INSERT INTO feeds(source, title) VALUES(?,?);", (source, title) + ) except sqlite3.IntegrityError: - raise FeedAlreadyAddedError - feed_id: int = int( - self.connection.execute("SELECT last_insert_rowid();").fetchone()[0] - ) - self.connection.executemany( - "INSERT INTO feed_tags(feed_id, tag) VALUES(?, ?);", + resp = conn.execute( + "SELECT COUNT(*) FROM feeds WHERE source = ? UNION ALL SELECT " + "COUNT(*) FROM feeds WHERE title = ?;", + (source, title), + ).fetchall() + if resp[0][0]: + raise FeedSourceAlreadyExistsError + elif resp[1][0]: + raise FeedTitleAlreadyInUseError(title) + else: + assert False, ( + "This should be impossible: unknown error when trying to " + f'store feed with title "{title}" and source "{source}".' + ) + else: + feed_id: FeedId = conn.execute( + "SELECT last_insert_rowid();" + ).fetchone()[0] + conn.executemany( + "INSERT INTO feed_tags(feed_id, tag) VALUES(?,?);", ((feed_id, tag) for tag in tags), ) - self.store_feed_entries(feed_id, parsed_feed, epoch_downloaded) return feed_id - def get_entries( - self, - *, - limit: int, - offset: int = 0, - included_feeds: typing.Optional[typing.Collection[int]] = None, - included_tags: typing.Optional[typing.Collection[str]] = None, - ) -> list[dict[str, typing.Any]]: - where_clause: str = "WHERE 1" - if included_feeds: - where_clause += f" AND feed_id IN ({','.join('?' * len(included_feeds))})" - if included_tags: - where_clause += ( - " AND feed_id IN (SELECT feed_id FROM feed_tags WHERE tag = ?)" - * len(included_tags) - ) - with self.connection: - resp = self.connection.execute( - f"SELECT id, feed_id, title, link, epoch_published, epoch_updated FROM entries \ - {where_clause} \ - ORDER BY id DESC LIMIT ? OFFSET ?;", - ( - *(included_feeds if included_feeds else ()), - *(included_tags if included_tags else ()), - limit, - offset, - ), - ).fetchall() - - entries = [] - for entry in resp: - entries.append( - { - "id": entry[0], - "feed_id": entry[1], - "title": entry[2], - "link": entry[3], - "epoch_published": entry[4], - "epoch_updated": entry[5], - } - ) - return entries - - def get_feed_source(self, feed_id: int) -> str: - with self.connection: - return self.connection.execute( - "SELECT source FROM feeds WHERE id = ?;", (feed_id,) - ).fetchone()[0] - - def get_feed_title(self, feed_id: int) -> str: - with self.connection: - return self.connection.execute( - "SELECT title FROM feeds WHERE id = ?;", (feed_id,) - ).fetchone()[0] - - def get_feed_tags(self, feed_id: int) -> list[str]: - with self.connection: - return [ - t[0] - for t in self.connection.execute( - "SELECT tag FROM feed_tags WHERE feed_id = ?;", (feed_id,) - ).fetchall() - ] - - def set_feed_source(self, feed_id: int, feed_source: str): - with self.connection: - self.connection.execute( - "UPDATE feeds SET source = ? WHERE id = ?;", (feed_source, feed_id) - ) - - def set_feed_title(self, feed_id: int, feed_title: str): - with self.connection: - self.connection.execute( - "UPDATE feeds SET title = ? WHERE id = ?;", (feed_title, feed_id) - ) - - def set_feed_tags(self, feed_id: int, feed_tags: list[str]): - with self.connection: - self.connection.execute( - "DELETE FROM feed_tags WHERE feed_id = ?;", (feed_id,) - ) - self.connection.executemany( - "INSERT INTO feed_tags(feed_id, tag) VALUES(?, ?);", - ((feed_id, tag) for tag in feed_tags), - ) - - def delete_feed(self, feed_id: int) -> None: - with self.connection: - self.connection.execute("DELETE FROM feeds WHERE id = ?;", (feed_id,)) - def get_feeds( self, *, limit: int, offset: int = 0, - included_feeds: typing.Optional[list[int]] = None, + included_feeds: typing.Optional[list[FeedId]] = None, included_tags: typing.Optional[list[str]] = None, get_tags: bool = False, - ) -> list[dict[str, typing.Any]]: + ) -> list[PartialFeed]: where_clause = "WHERE 1" if included_feeds: where_clause += f" AND id IN ({','.join('?' * len(included_feeds))})" @@ -200,8 +158,8 @@ class TagRss: where_clause += " AND id IN (SELECT id FROM feed_tags WHERE tag = ?)" * len( included_tags ) - with self.connection: - resp = self.connection.execute( + with self.__get_connection() as conn: + resp = conn.execute( f"SELECT id, source, title FROM feeds \ {where_clause} \ ORDER BY id ASC LIMIT ? OFFSET ?;", @@ -212,71 +170,33 @@ class TagRss: offset, ), ).fetchall() - feeds: dict[int, dict[str, typing.Any]] = {} + feeds_dict: dict[FeedId, PartialFeed] = {} for row in resp: - feeds[row[0]] = { - "source": row[1], - "title": row[2], - } + feeds_dict[row[0]] = PartialFeed(source=row[1], title=row[2]) if get_tags: - feed_ids = feeds.keys() + feed_ids = feeds_dict.keys() placeholder_str = ",".join("?" * len(feed_ids)) - with self.connection: - resp = self.connection.execute( - f"SELECT feed_id, tag FROM feed_tags WHERE feed_id in ({placeholder_str});", + with self.__get_connection() as conn: + resp = conn.execute( + "SELECT feed_id, tag FROM feed_tags WHERE feed_id in " + f"({placeholder_str});", (*feed_ids,), ).fetchall() for row in resp: try: - feeds[row[0]]["tags"].append(row[1]) - except KeyError: - feeds[row[0]]["tags"] = [row[1]] - result: list[dict[str, typing.Any]] = [] - for item in feeds.items(): - feed = { - "id": item[0], - "source": item[1]["source"], - "title": item[1]["title"], - } + feeds_dict[row[0]].tags.append(row[1]) # type: ignore + except AttributeError: + feeds_dict[row[0]].tags = [row[1]] + result: list[PartialFeed] = [] + for item in feeds_dict.items(): + feed = PartialFeed(id=item[0], source=item[1].source, title=item[1].title) if get_tags: - try: - feed["tags"] = item[1]["tags"] - except KeyError: - feed["tags"] = [] + feed.tags = item[1].tags + if not feed.tags: + feed.tags = [] result.append(feed) return result - def get_entry_count( - self, - *, - included_feeds: typing.Optional[typing.Collection[int]] = None, - included_tags: typing.Optional[typing.Collection[str]] = None, - ) -> int: - if not (included_feeds or included_tags): - with self.connection: - return self.connection.execute( - "SELECT count from entry_count;" - ).fetchone()[0] - else: - where_clause: str = "WHERE 1" - if included_feeds: - where_clause += ( - f" AND feed_id IN ({','.join('?' * len(included_feeds))})" - ) - if included_tags: - where_clause += ( - " AND feed_id IN (SELECT feed_id FROM feed_tags WHERE tag = ?)" - * len(included_tags) - ) - with self.connection: - return self.connection.execute( - f"SELECT COUNT(*) FROM entries {where_clause};", - ( - *(included_feeds if included_feeds else ()), - *(included_tags if included_tags else ()), - ), - ).fetchone()[0] - def get_feed_count( self, *, @@ -284,10 +204,8 @@ class TagRss: included_tags: typing.Optional[typing.Collection[str]] = None, ) -> int: if not (included_feeds or included_tags): - with self.connection: - return self.connection.execute( - "SELECT count from feed_count;" - ).fetchone()[0] + with self.__get_connection(use_transaction=False) as conn: + return conn.execute("SELECT count from feed_count;").fetchone()[0] else: where_clause: str = "WHERE 1" if included_feeds: @@ -297,8 +215,8 @@ class TagRss: " AND id IN (SELECT id FROM feed_tags WHERE tag = ?)" * len(included_tags) ) - with self.connection: - return self.connection.execute( + with self.__get_connection(use_transaction=False) as conn: + return conn.execute( f"SELECT COUNT(*) FROM feeds {where_clause}", ( *(included_feeds if included_feeds else ()), @@ -306,27 +224,81 @@ class TagRss: ), ).fetchone()[0] - def store_feed_entries(self, feed_id: int, parsed_feed, epoch_downloaded: int): - for entry in reversed(parsed_feed.entries): - link: str = entry.get("link", None) - title: str = entry.get("title", None) + def get_feed_source(self, feed_id: FeedId) -> str: + with self.__get_connection(use_transaction=False) as conn: + return conn.execute( + "SELECT source FROM feeds WHERE id = ?;", (feed_id,) + ).fetchone()[0] + + def get_feed_title(self, feed_id: FeedId) -> str: + with self.__get_connection(use_transaction=False) as conn: + return conn.execute( + "SELECT title FROM feeds WHERE id = ?;", (feed_id,) + ).fetchone()[0] + + def get_feed_tags(self, feed_id: FeedId) -> list[str]: + with self.__get_connection(use_transaction=False) as conn: + return [ + t[0] + for t in conn.execute( + "SELECT tag FROM feed_tags WHERE feed_id = ?;", (feed_id,) + ).fetchall() + ] + + def set_feed_source(self, feed_id: FeedId, feed_source: str) -> None: + with self.__get_connection() as conn: try: - epoch_published: typing.Optional[int] = calendar.timegm( - entry.get("published_parsed", None) + conn.execute( + "UPDATE feeds SET source = ? WHERE id = ?;", (feed_source, feed_id) + ) + except sqlite3.IntegrityError: + raise FeedSourceAlreadyExistsError + + + def set_feed_title(self, feed_id: FeedId, feed_title: str) -> None: + with self.__get_connection() as conn: + try: + conn.execute( + "UPDATE feeds SET title = ? WHERE id = ?;", (feed_title, feed_id) + ) + except sqlite3.IntegrityError: + raise FeedTitleAlreadyInUseError + + def set_feed_tags(self, feed_id: FeedId, feed_tags: list[str]) -> None: + with self.__get_connection() as conn: + conn.execute("DELETE FROM feed_tags WHERE feed_id = ?;", (feed_id,)) + conn.executemany( + "INSERT INTO feed_tags(feed_id, tag) VALUES(?, ?);", + ((feed_id, tag) for tag in feed_tags), + ) + + def delete_feed(self, feed_id: FeedId) -> None: + with self.__get_connection() as conn: + conn.execute("DELETE FROM feeds WHERE id = ?;", (feed_id,)) + + def store_entries( + self, *, parsed: ParsedFeed, feed_id: FeedId, epoch_downloaded: Epoch + ) -> None: + for entry in reversed(parsed.entries): + link: typing.Optional[str] = entry.get("link", None) # type: ignore + title: typing.Optional[str] = entry.get("title", None) # type: ignore + try: + epoch_published: typing.Optional[Epoch] = calendar.timegm( + entry.get("published_parsed", None) # type: ignore ) except TypeError: epoch_published = None try: - epoch_updated: typing.Optional[int] = calendar.timegm( - entry.get("updated_parsed", None) + epoch_updated: typing.Optional[Epoch] = calendar.timegm( + entry.get("updated_parsed", None) # type: ignore ) except TypeError: epoch_updated = None - with self.connection: + with self.__get_connection() as conn: try: - self.connection.execute( + conn.execute( "INSERT INTO entries(feed_id, title, link, epoch_published, epoch_updated, epoch_downloaded) \ - VALUES(?, ?, ?, ?, ?, ?);", + VALUES(?, ?, ?, ?, ?, ?);", ( feed_id, title, @@ -341,5 +313,193 @@ class TagRss: # constraints would have been violated by the insert. raise StorageConstraintViolationError(e) + def get_entries( + self, + *, + limit: int, + offset: int = 0, + included_feeds: typing.Optional[typing.Collection[int]] = None, + included_tags: typing.Optional[typing.Collection[str]] = None, + ) -> list[Entry]: + where_clause: str = "WHERE 1" + if included_feeds: + where_clause += f" AND feed_id IN ({','.join('?' * len(included_feeds))})" + if included_tags: + where_clause += ( + " AND feed_id IN (SELECT feed_id FROM feed_tags WHERE tag = ?)" + * len(included_tags) + ) + with self.__get_connection(use_transaction=False) as conn: + resp = conn.execute( + f"SELECT id, feed_id, title, link, epoch_published, epoch_updated FROM entries \ + {where_clause} \ + ORDER BY id DESC LIMIT ? OFFSET ?;", + ( + *(included_feeds if included_feeds else ()), + *(included_tags if included_tags else ()), + limit, + offset, + ), + ).fetchall() + entries = [] + for entry in resp: + entries.append( + Entry( + id=entry[0], + feed_id=entry[1], + title=entry[2], + link=entry[3], + epoch_published=entry[4], + epoch_updated=entry[5], + ) + ) + return entries + + def get_entry_count( + self, + *, + included_feeds: typing.Optional[typing.Collection[int]] = None, + included_tags: typing.Optional[typing.Collection[str]] = None, + ) -> int: + if not (included_feeds or included_tags): + with self.__get_connection(use_transaction=False) as conn: + return conn.execute("SELECT count from entry_count;").fetchone()[0] + else: + where_clause: str = "WHERE 1" + if included_feeds: + where_clause += ( + f" AND feed_id IN ({','.join('?' * len(included_feeds))})" + ) + if included_tags: + where_clause += ( + " AND feed_id IN (SELECT feed_id FROM feed_tags WHERE tag = ?)" + * len(included_tags) + ) + with self.__get_connection(use_transaction=False) as conn: + return conn.execute( + f"SELECT COUNT(*) FROM entries {where_clause};", + ( + *(included_feeds if included_feeds else ()), + *(included_tags if included_tags else ()), + ), + ).fetchone()[0] + + def close(self): + with self.__get_connection(use_transaction=False) as conn: + conn.close() + + +class TagRss: + def __init__(self, *, storage_path: str | pathlib.Path): + self.__storage = SqliteStorageProvider(storage_path) + + def fetch_and_parse_feed(self, source) -> tuple[ParsedFeed, int]: + response = requests.get(source) + epoch_downloaded: int = int(time.time()) + if response.status_code != requests.codes.ok: + raise FeedFetchError(source, response.status_code) + try: + base: str = response.headers["Content-Location"] + except KeyError: + base: str = source + parsed: ParsedFeed = feedparser.parse( + io.BytesIO(bytes(response.text, encoding="utf-8")), + response_headers={"Content-Location": base}, + ) + return (parsed, epoch_downloaded) + + def add_feed( + self, + source: str, + tags: list[str], + ) -> int: + parsed, epoch_downloaded = self.fetch_and_parse_feed(source) + title: str = parsed.feed.get("title", "") # type: ignore + feed_id = self.__storage.store_feed(source=source, title=title, tags=tags) + self.__storage.store_entries( + parsed=parsed, feed_id=feed_id, epoch_downloaded=epoch_downloaded + ) + return feed_id + + def get_feed_source(self, feed_id: FeedId) -> str: + return self.__storage.get_feed_source(feed_id) + + def get_feed_title(self, feed_id: FeedId) -> str: + return self.__storage.get_feed_title(feed_id) + + def get_feed_tags(self, feed_id: FeedId) -> list[str]: + return self.__storage.get_feed_tags(feed_id) + + def set_feed_source(self, feed_id: FeedId, feed_source: str): + self.__storage.set_feed_source(feed_id, feed_source) + + def set_feed_title(self, feed_id: FeedId, feed_title: str) -> None: + self.__storage.set_feed_title(feed_id, feed_title) + + def set_feed_tags(self, feed_id: FeedId, feed_tags: list[str]): + self.__storage.set_feed_tags(feed_id, feed_tags) + + def delete_feed(self, feed_id: int) -> None: + self.__storage.delete_feed(feed_id) + + def get_feeds( + self, + *, + limit: int, + offset: int = 0, + included_feeds: typing.Optional[list[int]] = None, + included_tags: typing.Optional[list[str]] = None, + get_tags: bool = False, + ) -> list[PartialFeed]: + return self.__storage.get_feeds( + limit=limit, + offset=offset, + included_feeds=included_feeds, + included_tags=included_tags, + get_tags=get_tags, + ) + + def get_feed_count( + self, + *, + included_feeds: typing.Optional[typing.Collection[int]] = None, + included_tags: typing.Optional[typing.Collection[str]] = None, + ) -> int: + return self.__storage.get_feed_count( + included_feeds=included_feeds, included_tags=included_tags + ) + + def get_entries( + self, + *, + limit: int, + offset: int = 0, + included_feeds: typing.Optional[typing.Collection[int]] = None, + included_tags: typing.Optional[typing.Collection[str]] = None, + ) -> list[Entry]: + return self.__storage.get_entries( + limit=limit, + offset=offset, + included_feeds=included_feeds, + included_tags=included_tags, + ) + + def get_entry_count( + self, + *, + included_feeds: typing.Optional[typing.Collection[int]] = None, + included_tags: typing.Optional[typing.Collection[str]] = None, + ) -> int: + return self.__storage.get_entry_count( + included_feeds=included_feeds, included_tags=included_tags + ) + + def store_feed_entries( + self, parsed: ParsedFeed, feed_id: FeedId, epoch_downloaded: int + ): + self.__storage.store_entries( + parsed=parsed, feed_id=feed_id, epoch_downloaded=epoch_downloaded + ) + def close(self) -> None: - self.connection.close() + self.__storage.close() diff --git a/views/index.tpl b/views/index.tpl index 0145a4e..1851531 100644 --- a/views/index.tpl +++ b/views/index.tpl @@ -100,12 +100,14 @@ % for i, entry in enumerate(entries): {{i + 1 + offset}} - {{entry["title"]}} + {{entry.title}} <% local_date = "" utc_date = "" - epoch = entry.get("epoch_published") - epoch = entry.get("epoch_updated", epoch) + epoch = entry.epoch_updated + if not epoch: + epoch = entry.epoch_published + end if epoch: local_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(epoch)) utc_date = time.strftime("%Y-%m-%d %H:%M:%SZ", time.gmtime(epoch)) @@ -116,7 +118,7 @@
- % tags = referenced_feeds[entry["feed_id"]]["tags"] + % tags = referenced_feeds[entry.feed_id].tags % for i, tag in enumerate(tags): % if i > 0: {{", "}} @@ -127,9 +129,9 @@
- - {{referenced_feeds[entry["feed_id"]]["title"]}} - ({{entry["feed_id"]}}) + + {{referenced_feeds[entry.feed_id].title}} + ({{entry.feed_id}})
diff --git a/views/list_feeds.tpl b/views/list_feeds.tpl index 2b4bd6d..65432a3 100644 --- a/views/list_feeds.tpl +++ b/views/list_feeds.tpl @@ -55,11 +55,11 @@ % for i, feed in enumerate(feeds): {{i + 1 + offset}} - {{feed["id"]}} - {{feed["title"]}} (filter) + {{feed.id}} + {{feed.title}} (filter)
- % for i, tag in enumerate(feed["tags"]): + % for i, tag in enumerate(feed.tags): % if i > 0: {{", "}} % end @@ -67,8 +67,8 @@ % end
- 🔗 - + 🔗 + % end -- cgit v1.2.3-57-g22cb