1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
import feedparser
import requests
import calendar
import io
import pathlib
import sqlite3
import time
import typing
class FeedAlreadyAddedError(Exception):
pass
class FeedFetchError(Exception):
def __init__(self, feed_source: str, status_code: int):
super().__init__(f"Get {feed_source} returned HTTP {status_code}")
class TagRss:
def __init__(self, *, storage_path: str | pathlib.Path):
self.connection: sqlite3.Connection = sqlite3.connect(storage_path)
with self.connection:
self.connection.executescript(
"""
CREATE TABLE IF NOT EXISTS feeds(id INTEGER PRIMARY KEY, source TEXT UNIQUE, title TEXT);
CREATE INDEX IF NOT EXISTS feed_source ON feeds(source);
CREATE TABLE IF NOT EXISTS feed_tags(feed_id INTEGER, tag TEXT);
CREATE INDEX IF NOT EXISTS feed_tags_feed_id ON feed_tags(feed_id);
CREATE TABLE IF NOT EXISTS entries(id INTEGER PRIMARY KEY, feed_id INTEGER, title TEXT, link TEXT, epoch_published INTEGER, epoch_updated INTEGER, epoch_downloaded INTEGER);
CREATE INDEX IF NOT EXISTS entry_epoch_downloaded ON entries(epoch_downloaded);
"""
)
def add_feed(self, *, feed_source: str, tags: tuple[str]):
response = requests.get(feed_source)
if response.status_code != requests.codes.ok:
raise FeedFetchError(feed_source, response.status_code)
try:
base: str = response.headers["Content-Location"]
except KeyError:
base: str = feed_source
parsed = feedparser.parse(
io.BytesIO(bytes(response.text, encoding="utf-8")), response_headers={"Content-Location": base}
)
with self.connection:
feed_title: str = parsed.feed.get("title", "")
try:
self.connection.execute(
"INSERT INTO feeds(source, title) VALUES(?, ?);",
(feed_source, feed_title),
)
except sqlite3.IntegrityError:
raise FeedAlreadyAddedError
feed_id: int = int(
self.connection.execute(
"SELECT id FROM feeds WHERE source = ?;", (feed_source,)
).fetchone()[0]
)
self.connection.executemany(
f"INSERT INTO feed_tags(feed_id, tag) VALUES({feed_id}, ?);", tuple(((tag,) for tag in tags))
)
for entry in reversed(parsed.entries):
link: str = entry.get("link", "")
try:
epoch_published: typing.Optional[int] = calendar.timegm(
entry.get("published_parsed", None)
)
except ValueError:
epoch_published = None
try:
epoch_updated: typing.Optional[int] = calendar.timegm(
entry.get("updated_parsed", None)
)
except ValueError:
epoch_updated = None
self.connection.execute(
"INSERT INTO entries(feed_id, title, link, epoch_published, epoch_updated, epoch_downloaded) VALUES(?, ?, ?, ?, ?, ?);",
(
feed_id,
feed_title,
link,
epoch_published,
epoch_updated,
int(time.time()),
),
)
def get_entries(self, *, limit: int):
with self.connection:
result = self.connection.execute(
"SELECT feed_id, title, link, epoch_published, epoch_updated FROM entries ORDER BY epoch_downloaded DESC LIMIT ?;",
(limit,),
).fetchall()
entries = []
for entry in result:
entries.append(
{
"feed_id": entry[0],
"title": entry[1],
"link": entry[2],
"epoch_published": entry[3],
"epoch_updated": entry[4],
}
)
return entries
def get_feed_tags(self, feed_id: int) -> tuple[str]:
with self.connection:
return tuple((t[0] for t in self.connection.execute("SELECT tag FROM feed_tags WHERE feed_id = ?;", (feed_id,)).fetchall()))
|