aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArjun Satarkar <me@arjunsatarkar.net>2023-07-28 15:25:47 +0000
committerArjun Satarkar <me@arjunsatarkar.net>2023-07-28 15:25:47 +0000
commitd14c45978119ba150ffff8abf6c87634ecb46c52 (patch)
tree2c816d3da0f83569a61533960c9059ee9e84e8da
parent15f0a2d2577949cd250aea54561b29634943468f (diff)
downloadtagrss-d14c45978119ba150ffff8abf6c87634ecb46c52.tar
tagrss-d14c45978119ba150ffff8abf6c87634ecb46c52.tar.gz
tagrss-d14c45978119ba150ffff8abf6c87634ecb46c52.zip
Stop feed fetching from blocking everything
We accomplished this by switching from gevent to cherrypy (should be possible with gevent, but it wasn't worth the trouble) and shifting the HTTP request part of the process out of the core TagRss class. Also added a bit of logging, got rid of /delete_feed returning a spurious success page in response to GET requests,
-rw-r--r--requirements.txt27
-rwxr-xr-xserve.py47
-rw-r--r--tagrss.py83
3 files changed, 81 insertions, 76 deletions
diff --git a/requirements.txt b/requirements.txt
index 268609c..87749db 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,13 +1,26 @@
+annotated-types==0.5.0
+autocommand==2.2.2
bottle==0.12.25
-certifi==2023.5.7
-charset-normalizer==3.1.0
+certifi==2023.7.22
+charset-normalizer==3.2.0
+cheroot==10.0.0
+CherryPy==18.8.0
feedparser==6.0.10
-gevent==22.10.2
-greenlet==2.0.2
idna==3.4
+inflect==7.0.0
+jaraco.collections==4.3.0
+jaraco.context==4.3.0
+jaraco.functools==3.8.0
+jaraco.text==3.11.1
+more-itertools==10.0.0
+portend==3.2.0
+pydantic==2.1.1
+pydantic_core==2.4.0
+pytz==2023.3
requests==2.31.0
schedule==1.2.0
sgmllib3k==1.0.0
-urllib3==2.0.3
-zope.event==4.6
-zope.interface==6.0
+tempora==5.5.0
+typing_extensions==4.7.1
+urllib3==2.0.4
+zc.lockfile==3.0.post1
diff --git a/serve.py b/serve.py
index 41a4757..89b8a74 100755
--- a/serve.py
+++ b/serve.py
@@ -1,14 +1,11 @@
#!/usr/bin/env python3
-import gevent.monkey
-
-gevent.monkey.patch_all()
import bottle
-import gevent.lock
+import schedule
import argparse
-import pathlib
+import logging
import math
-import schedule
+import pathlib
import threading
import time
import typing
@@ -18,6 +15,8 @@ import tagrss
MAX_PER_PAGE_ENTRIES = 1000
DEFAULT_PER_PAGE_ENTRIES = 50
+logging.basicConfig(level=logging.INFO)
+
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="localhost")
parser.add_argument("--port", default=8000, type=int)
@@ -27,7 +26,7 @@ args = parser.parse_args()
storage_path: pathlib.Path = pathlib.Path(args.storage_path)
-core_lock = gevent.lock.RLock()
+core_lock = threading.RLock()
core = tagrss.TagRss(storage_path=storage_path)
@@ -113,9 +112,11 @@ def add_feed_effect():
tags = parse_space_separated_tags(bottle.request.forms.get("tags")) # type: ignore
already_present: bool = False
+
+ parsed, epoch_downloaded = tagrss.fetch_parsed_feed(feed_source)
with core_lock:
try:
- core.add_feed(feed_source=feed_source, tags=tags)
+ core.add_feed(feed_source=feed_source, parsed_feed=parsed, epoch_downloaded=epoch_downloaded, tags=tags)
except tagrss.FeedAlreadyAddedError:
already_present = True
# TODO: handle FeedFetchError too
@@ -145,7 +146,7 @@ def manage_feed_view():
@bottle.post("/manage_feed")
-def manage_feed_effect_update():
+def manage_feed_effect():
feed: dict[str, typing.Any] = {}
feed["id"] = int(bottle.request.forms["id"]) # type: ignore
feed["source"] = bottle.request.forms["source"] # type: ignore
@@ -159,17 +160,12 @@ def manage_feed_effect_update():
return bottle.template("manage_feed", feed=feed, after_update=True)
-@bottle.get("/delete_feed")
-def delete_feed_view():
- return bottle.static_file("delete_feed.html", root="views")
-
-
@bottle.post("/delete_feed")
-def delete_feed_effect():
+def delete_feed():
feed_id: int = int(bottle.request.forms["id"]) # type: ignore
with core_lock:
core.delete_feed(feed_id)
- return bottle.redirect("/delete_feed")
+ return bottle.static_file("delete_feed.html", root="views")
@bottle.get("/static/<path:path>")
@@ -179,23 +175,30 @@ def serve_static(path):
def update_feeds(run_event: threading.Event):
def inner_update():
+ logging.info("Updating feeds...")
+ limit = 100
with core_lock:
- feeds = core.get_all_feed_ids()
- for feed_id in feeds():
- core.fetch_new_feed_entries(feed_id)
-
+ feed_count = core.get_feed_count()
+ for i in range(math.ceil(feed_count / limit)):
+ with core_lock:
+ feeds = core.get_feeds(limit=limit, offset=limit * i)
+ for feed in feeds:
+ parsed_feed, epoch_downloaded = tagrss.fetch_parsed_feed(feed["source"])
+ logging.debug(f"Updated feed with source {feed['source']} .")
+ with core_lock:
+ core.store_feed_entries(feed["id"], parsed_feed, epoch_downloaded)
+ logging.info("Finished updating feeds.")
inner_update()
schedule.every(args.update_seconds).seconds.do(inner_update)
while run_event.is_set():
schedule.run_pending()
time.sleep(1)
-
feed_update_run_event = threading.Event()
feed_update_run_event.set()
threading.Thread(target=update_feeds, args=(feed_update_run_event,)).start()
-bottle.run(host=args.host, port=args.port, server="gevent")
+bottle.run(host=args.host, port=args.port, server="cheroot")
feed_update_run_event.clear()
with core_lock:
core.close()
diff --git a/tagrss.py b/tagrss.py
index 038c940..8ae001c 100644
--- a/tagrss.py
+++ b/tagrss.py
@@ -22,9 +22,34 @@ class SqliteMissingForeignKeySupportError(Exception):
pass
+class Sqlite3NotSerializedModeError(Exception):
+ pass
+
+
+def fetch_parsed_feed(feed_source: str) -> tuple[feedparser.FeedParserDict, int]:
+ response = requests.get(feed_source)
+ epoch_downloaded: int = int(time.time())
+ if response.status_code != requests.codes.ok:
+ raise FeedFetchError(feed_source, response.status_code)
+ try:
+ base: str = response.headers["Content-Location"]
+ except KeyError:
+ base: str = feed_source
+ parsed = feedparser.parse(
+ io.BytesIO(bytes(response.text, encoding="utf-8")),
+ response_headers={"Content-Location": base},
+ )
+ return (parsed, epoch_downloaded)
+
+
class TagRss:
def __init__(self, *, storage_path: str | pathlib.Path):
- self.connection: sqlite3.Connection = sqlite3.connect(storage_path)
+ if sqlite3.threadsafety != 3:
+ raise Sqlite3NotSerializedModeError
+
+ self.connection: sqlite3.Connection = sqlite3.connect(
+ storage_path, check_same_thread=False
+ )
with self.connection:
with open("setup.sql", "r") as setup_script:
@@ -32,21 +57,16 @@ class TagRss:
if (1,) not in self.connection.execute("PRAGMA foreign_keys;").fetchmany(1):
raise SqliteMissingForeignKeySupportError
- def add_feed(self, feed_source: str, tags: list[str]) -> None:
- response = requests.get(feed_source)
- epoch_downloaded: int = int(time.time())
- if response.status_code != requests.codes.ok:
- raise FeedFetchError(feed_source, response.status_code)
- try:
- base: str = response.headers["Content-Location"]
- except KeyError:
- base: str = feed_source
- parsed = feedparser.parse(
- io.BytesIO(bytes(response.text, encoding="utf-8")),
- response_headers={"Content-Location": base},
- )
+ def add_feed(
+ self,
+ *,
+ feed_source: str,
+ parsed_feed: feedparser.FeedParserDict,
+ epoch_downloaded: int,
+ tags: list[str],
+ ) -> None:
with self.connection:
- feed_title: str = parsed.feed.get("title", "")
+ feed_title: str = parsed_feed.feed.get("title", "") # type: ignore
try:
self.connection.execute(
"INSERT INTO feeds(source, title) VALUES(?, ?);",
@@ -61,7 +81,7 @@ class TagRss:
"INSERT INTO feed_tags(feed_id, tag) VALUES(?, ?);",
((feed_id, tag) for tag in tags),
)
- self.store_feed_entries(feed_id, parsed, epoch_downloaded)
+ self.store_feed_entries(feed_id, parsed_feed, epoch_downloaded)
def get_entries(
self, *, limit: int, offset: int = 0
@@ -152,18 +172,6 @@ class TagRss:
)
return feeds
- def get_all_feed_ids(self):
- def inner():
- with self.connection:
- resp = self.connection.execute("SELECT id FROM feeds;")
- while True:
- row = resp.fetchone()
- if not row:
- break
- yield row[0]
-
- return inner
-
def get_entry_count(self) -> int:
with self.connection:
return self.connection.execute("SELECT count from entry_count;").fetchone()[
@@ -176,25 +184,6 @@ class TagRss:
0
]
- def fetch_new_feed_entries(self, feed_id: int) -> None:
- with self.connection:
- feed_source: str = self.connection.execute(
- "SELECT source FROM feeds WHERE id = ?;", (feed_id,)
- ).fetchone()[0]
- response = requests.get(feed_source)
- epoch_downloaded: int = int(time.time())
- if response.status_code != requests.codes.ok:
- raise FeedFetchError(feed_source, response.status_code)
- try:
- base: str = response.headers["Content-Location"]
- except KeyError:
- base: str = feed_source
- parsed = feedparser.parse(
- io.BytesIO(bytes(response.text, encoding="utf-8")),
- response_headers={"Content-Location": base},
- )
- self.store_feed_entries(feed_id, parsed, epoch_downloaded)
-
def store_feed_entries(self, feed_id: int, parsed_feed, epoch_downloaded: int):
for entry in reversed(parsed_feed.entries):
link: str = entry.get("link", None)