diff options
author | Arjun Satarkar <me@arjunsatarkar.net> | 2024-07-17 13:41:06 +0000 |
---|---|---|
committer | Arjun Satarkar <me@arjunsatarkar.net> | 2024-07-17 13:41:06 +0000 |
commit | 06f7696c0976c75c13435f84a2101c1203c18b95 (patch) | |
tree | 9ea52c784f4bc5013c8e8af32e82c0a497889b32 /lib | |
download | mediasync-06f7696c0976c75c13435f84a2101c1203c18b95.tar mediasync-06f7696c0976c75c13435f84a2101c1203c18b95.tar.gz mediasync-06f7696c0976c75c13435f84a2101c1203c18b95.zip |
Initial commit
Diffstat (limited to 'lib')
-rw-r--r-- | lib/mediasync.ex | 18 | ||||
-rw-r--r-- | lib/mediasync/application.ex | 35 | ||||
-rw-r--r-- | lib/mediasync/http_errors.ex | 93 | ||||
-rw-r--r-- | lib/mediasync/playback_state.ex | 9 | ||||
-rw-r--r-- | lib/mediasync/room.ex | 119 | ||||
-rw-r--r-- | lib/mediasync/room_connection.ex | 106 | ||||
-rw-r--r-- | lib/mediasync/room_id.ex | 9 | ||||
-rw-r--r-- | lib/mediasync/router.ex | 119 | ||||
-rw-r--r-- | lib/mediasync/templates.ex | 5 | ||||
-rw-r--r-- | lib/mediasync/user_token.ex | 58 | ||||
-rw-r--r-- | lib/mediasync/utils.ex | 66 | ||||
-rw-r--r-- | lib/mix/tasks/vendor.ex | 11 |
12 files changed, 648 insertions, 0 deletions
diff --git a/lib/mediasync.ex b/lib/mediasync.ex new file mode 100644 index 0000000..a7bf704 --- /dev/null +++ b/lib/mediasync.ex @@ -0,0 +1,18 @@ +defmodule Mediasync do + @moduledoc """ + Documentation for `Mediasync`. + """ + + @doc """ + Hello world. + + ## Examples + + iex> Mediasync.hello() + :world + + """ + def hello do + :world + end +end diff --git a/lib/mediasync/application.ex b/lib/mediasync/application.ex new file mode 100644 index 0000000..1e8f6d4 --- /dev/null +++ b/lib/mediasync/application.ex @@ -0,0 +1,35 @@ +defmodule Mediasync.Application do + # See https://hexdocs.pm/elixir/Application.html + # for more information on OTP Applications + @moduledoc false + + use Application + + @impl true + def start(_type, _args) do + children = [ + # Starts a worker by calling: Mediasync.Worker.start_link(arg) + # {Mediasync.Worker, arg} + {Bandit, + plug: Mediasync.Router, + scheme: :http, + ip: {127, 0, 0, 1}, + port: Application.get_env(:mediasync, :port)}, + {DynamicSupervisor, + max_children: Application.get_env(:mediasync, :max_rooms), + max_restarts: 0, + strategy: :one_for_one, + name: Mediasync.RoomSupervisor}, + {Registry, keys: :unique, name: Mediasync.RoomRegistry}, + {Registry, keys: :unique, name: Mediasync.RoomConnectionRegistry}, + {Registry, keys: :duplicate, name: Mediasync.RoomSubscriptionRegistry} + ] + + System.no_halt(true) + + # See https://hexdocs.pm/elixir/Supervisor.html + # for other strategies and supported options + opts = [strategy: :one_for_one, name: Mediasync.Supervisor] + Supervisor.start_link(children, opts) + end +end diff --git a/lib/mediasync/http_errors.ex b/lib/mediasync/http_errors.ex new file mode 100644 index 0000000..2b11ea6 --- /dev/null +++ b/lib/mediasync/http_errors.ex @@ -0,0 +1,93 @@ +defmodule Mediasync.HTTPErrors do + import Plug.Conn + import Mediasync.Utils + + @video_url_too_large Jason.encode!( + %{ + "error" => "video_url_too_large", + "max_size" => Application.compile_env(:mediasync, :max_video_url_size) + }, + pretty: true + ) + + @spec send_video_url_too_large(Plug.Conn.t()) :: Plug.Conn.t() + @spec send_video_url_too_large(Plug.Conn.t(), []) :: Plug.Conn.t() + def send_video_url_too_large(conn, _opts \\ []) do + conn + |> put_json_content_type() + |> send_resp( + 400, + @video_url_too_large + ) + end + + @invalid_video_url Jason.encode!( + %{ + "error" => "invalid_video_url" + }, + pretty: true + ) + + @spec send_invalid_video_url(Plug.Conn.t()) :: Plug.Conn.t() + @spec send_invalid_video_url(Plug.Conn.t(), []) :: Plug.Conn.t() + def send_invalid_video_url(conn, _opts \\ []) do + conn + |> put_json_content_type() + |> send_resp( + 400, + @invalid_video_url + ) + end + + @not_found Jason.encode!( + %{ + "error" => "not_found", + "message" => "No page was found at this location." + }, + pretty: true + ) + + @spec send_not_found(Plug.Conn.t()) :: Plug.Conn.t() + @spec send_not_found(Plug.Conn.t(), []) :: Plug.Conn.t() + def send_not_found(conn, _opts \\ []) do + conn + |> put_json_content_type() + |> send_resp( + 404, + @not_found + ) + end + + @invalid_csrf_token Jason.encode!( + %{ + "error" => "invalid_csrf_token", + "message" => "Try reloading the previous page and retrying." + }, + pretty: true + ) + + @spec send_invalid_csrf_token(Plug.Conn.t()) :: Plug.Conn.t() + @spec send_invalid_csrf_token(Plug.Conn.t(), []) :: Plug.Conn.t() + def send_invalid_csrf_token(conn, _opts \\ []) do + conn + |> put_json_content_type() + |> send_resp(400, @invalid_csrf_token) + end + + @unknown Jason.encode!( + %{ + "error" => "unknown", + "message" => + "Something went wrong. Consider reloading the previous page and retrying your action." + }, + pretty: true + ) + + @spec send_unknown(Plug.Conn.t()) :: Plug.Conn.t() + @spec send_unknown(Plug.Conn.t(), []) :: Plug.Conn.t() + def send_unknown(conn, _opts \\ []) do + conn + |> put_json_content_type() + |> send_resp(500, @unknown) + end +end diff --git a/lib/mediasync/playback_state.ex b/lib/mediasync/playback_state.ex new file mode 100644 index 0000000..85afc01 --- /dev/null +++ b/lib/mediasync/playback_state.ex @@ -0,0 +1,9 @@ +defmodule Mediasync.PlaybackState do + @enforce_keys [:paused?, :position_milliseconds] + defstruct @enforce_keys + + @type t() :: %Mediasync.PlaybackState{ + paused?: boolean(), + position_milliseconds: integer() + } +end diff --git a/lib/mediasync/room.ex b/lib/mediasync/room.ex new file mode 100644 index 0000000..c425441 --- /dev/null +++ b/lib/mediasync/room.ex @@ -0,0 +1,119 @@ +defmodule Mediasync.Room.State do + @enforce_keys [:video_url, :host_user_token_hash] + + @host_disconnected_tries_max 5 * 6 + + defstruct [ + :video_url, + :host_user_token_hash, + :room_id, + host_disconnected_tries: @host_disconnected_tries_max + ] + + def host_disconnected_tries_max do + @host_disconnected_tries_max + end + + @type t() :: %Mediasync.Room.State{ + video_url: binary(), + host_user_token_hash: Mediasync.UserToken.hash(), + room_id: Mediasync.RoomID.t(), + host_disconnected_tries: integer() + } +end + +defmodule Mediasync.Room do + use GenServer + + @spec start_link(Mediasync.Room.State.t()) :: tuple() + def start_link(state = %Mediasync.Room.State{}) do + state = + case state.room_id do + nil -> %{state | room_id: Mediasync.RoomID.generate()} + _ -> state + end + + Tuple.append( + GenServer.start_link(__MODULE__, state, + name: {:via, Registry, {Mediasync.RoomRegistry, state.room_id}} + ), + state.room_id + ) + end + + @spec get_video_url(GenServer.server()) :: binary() + def get_video_url(pid) do + GenServer.call(pid, :get_video_url) + end + + @spec host?(GenServer.server(), Mediasync.UserToken.hash()) :: boolean() + def host?(pid, user_token_hash) do + GenServer.call(pid, {:host?, user_token_hash}) + end + + @spec publish_playback_state(GenServer.server(), Mediasync.PlaybackState.t()) :: :ok + def publish_playback_state(pid, playback_state = %Mediasync.PlaybackState{}) do + GenServer.call(pid, {:publish_playback_state, playback_state}) + end + + @inactive_check_wait_milliseconds 10 * 1000 + + @impl true + @spec init(Mediasync.Room.State.t()) :: {:ok, Mediasync.Room.State.t()} + def init(room_state = %Mediasync.Room.State{}) do + Process.send_after(self(), :check_if_active, @inactive_check_wait_milliseconds) + + {:ok, room_state} + end + + @impl true + def handle_call(:get_video_url, _from, state = %Mediasync.Room.State{}) do + {:reply, state.video_url, state} + end + + @impl true + def handle_call({:host?, user_token_hash}, _from, state = %Mediasync.Room.State{}) do + host_user_token_hash = state.host_user_token_hash + + case user_token_hash do + ^host_user_token_hash -> {:reply, true, state} + _ -> {:reply, false, state} + end + end + + @impl true + def handle_call( + {:publish_playback_state, playback_state = %Mediasync.PlaybackState{}}, + _from, + state = %Mediasync.Room.State{} + ) do + Registry.dispatch(Mediasync.RoomSubscriptionRegistry, state.room_id, fn entries -> + for {pid, _} <- entries, do: send(pid, {:playback_state_update, playback_state}) + end) + + {:reply, :ok, state} + end + + @impl true + def handle_info(:check_if_active, state) do + state = + case Registry.lookup( + Mediasync.RoomConnectionRegistry, + {state.room_id, state.host_user_token_hash} + ) do + [{_pid, _value}] -> + %{state | host_disconnected_tries: Mediasync.Room.State.host_disconnected_tries_max()} + + _ -> + %{state | host_disconnected_tries: state.host_disconnected_tries - 1} + end + + Process.send_after(self(), :check_if_active, @inactive_check_wait_milliseconds) + + if state.host_disconnected_tries <= 0 do + {:stop, :no_host, state} + else + {:noreply, state} + end + end +end diff --git a/lib/mediasync/room_connection.ex b/lib/mediasync/room_connection.ex new file mode 100644 index 0000000..ec44b7b --- /dev/null +++ b/lib/mediasync/room_connection.ex @@ -0,0 +1,106 @@ +defmodule Mediasync.RoomConnection.State do + @enforce_keys [:room_pid, :room_id, :user_token_hash] + defstruct [ + :room_pid, + :room_id, + :user_token_hash, + :room_monitor_ref, + :host?, + initialized?: false + ] + + @type t() :: %Mediasync.RoomConnection.State{ + room_pid: pid(), + room_id: Mediasync.RoomID.t(), + user_token_hash: Mediasync.UserToken.hash(), + room_monitor_ref: reference() | nil, + host?: boolean() | nil, + initialized?: boolean() + } +end + +defmodule Mediasync.RoomConnection do + import Mediasync.Utils, only: [bool_to_int_repr: 1, int_repr_to_bool!: 1] + + @behaviour WebSock + + @spec init(Mediasync.RoomConnection.State.t()) :: {:ok, Mediasync.RoomConnection.State.t()} + @impl true + def init(state = %Mediasync.RoomConnection.State{}) do + state = %{state | room_monitor_ref: Process.monitor(state.room_pid)} + + case Registry.register( + Mediasync.RoomConnectionRegistry, + {state.room_id, state.user_token_hash}, + nil + ) do + {:ok, _pid} -> + state = %{state | host?: Mediasync.Room.host?(state.room_pid, state.user_token_hash)} + + unless state.host? do + Registry.register(Mediasync.RoomSubscriptionRegistry, state.room_id, nil) + end + + {:ok, state} + + _ -> + {:stop, {:error, :user_already_connected}, state} + end + end + + @impl true + def handle_in( + {"i", opcode: :binary}, + state = %Mediasync.RoomConnection.State{} + ) do + {:push, + [ + {:binary, <<"i", bool_to_int_repr(state.host?)::8>>} + ], %{state | initialized?: true}} + end + + @impl true + def handle_in( + {<<"s", paused?::8, position_milliseconds::64>>, opcode: :binary}, + state = %Mediasync.RoomConnection.State{} + ) do + if state.host? do + Mediasync.Room.publish_playback_state(state.room_pid, %Mediasync.PlaybackState{ + paused?: int_repr_to_bool!(paused?), + position_milliseconds: position_milliseconds + }) + + {:ok, state} + else + {:stop, {:error, :received_forbidden_state_update}, state} + end + end + + @impl true + def handle_info( + {:playback_state_update, playback_state = %Mediasync.PlaybackState{}}, + state = %Mediasync.RoomConnection.State{} + ) do + if state.initialized? do + {:push, + {:binary, + <<"s", bool_to_int_repr(playback_state.paused?)::8, + playback_state.position_milliseconds::64>>}, state} + else + {:ok, state} + end + end + + @impl true + def handle_info( + {:DOWN, ref, :process, _object, _reason}, + state = %Mediasync.RoomConnection.State{} + ) do + room_monitor_ref = state.room_monitor_ref + + case ref do + ^room_monitor_ref -> {:stop, {:error, :room_exited}, state} + _ -> {:stop, {:error, :unexpected_down_message}, state} + end + end +end diff --git a/lib/mediasync/room_id.ex b/lib/mediasync/room_id.ex new file mode 100644 index 0000000..64a8d23 --- /dev/null +++ b/lib/mediasync/room_id.ex @@ -0,0 +1,9 @@ +defmodule Mediasync.RoomID do + @type t() :: binary() + + @spec generate() :: t() + def generate do + Application.get_env(:mediasync, :node_id) <> + "~" <> Base.url_encode64(:crypto.strong_rand_bytes(16), padding: false) + end +end diff --git a/lib/mediasync/router.ex b/lib/mediasync/router.ex new file mode 100644 index 0000000..54ef533 --- /dev/null +++ b/lib/mediasync/router.ex @@ -0,0 +1,119 @@ +defmodule Mediasync.Router do + import Mediasync.Utils + import Mediasync.UserToken + use Plug.Router + use Plug.ErrorHandler + + plug(Plug.Logger) + plug(Plug.Head) + + plug(Plug.Static, at: "/static", from: {:mediasync, "priv/static"}) + + plug(:put_secret_key_base) + + plug(Plug.Session, + store: :cookie, + key: "_mediasync_session", + encryption_salt: {Mediasync.Utils, :get_session_encryption_salt, []}, + signing_salt: {Mediasync.Utils, :get_session_signing_salt, []} + ) + + plug(:fetch_session) + + plug(:ensure_user_token) + + plug(:match) + + plug(Plug.Parsers, + parsers: [:urlencoded], + pass: ["application/x-www-form-urlencoded"] + ) + + plug(Plug.CSRFProtection) + + plug(:dispatch) + + get "/" do + conn + |> put_html_content_type() + |> send_resp(200, Mediasync.Templates.home()) + end + + post "/host_room" do + video_url = conn.body_params["video_url"] + + cond do + byte_size(video_url) > Application.get_env(:mediasync, :max_video_url_size) -> + Mediasync.HTTPErrors.send_video_url_too_large(conn) + + elem(URI.new(video_url), 0) != :ok -> + Mediasync.HTTPErrors.send_invalid_video_url(conn) + + true -> + {:ok, _pid, room_id} = + DynamicSupervisor.start_child( + Mediasync.RoomSupervisor, + {Mediasync.Room, + %Mediasync.Room.State{ + video_url: video_url, + host_user_token_hash: get_user_token_hash!(conn) + }} + ) + + redirect(conn, status: 303, location: "/room/#{room_id}") + end + end + + get "/room/:room_id" do + case Registry.lookup(Mediasync.RoomRegistry, conn.path_params["room_id"]) do + [{pid, _value}] -> + conn + |> put_html_content_type() + |> send_resp(200, Mediasync.Templates.room(Mediasync.Room.get_video_url(pid))) + + [] -> + Mediasync.HTTPErrors.send_not_found(conn) + end + end + + get "/room/:room_id/websocket" do + # TODO: verify origin before doing any of this + + user_token_hash = get_user_token_hash!(conn) + room_id = conn.path_params["room_id"] + + case Registry.lookup(Mediasync.RoomRegistry, room_id) do + [{pid, _value}] -> + conn + |> WebSockAdapter.upgrade( + Mediasync.RoomConnection, + %Mediasync.RoomConnection.State{ + room_pid: pid, + room_id: room_id, + user_token_hash: user_token_hash + }, + max_frame_size: Application.fetch_env!(:mediasync, :websocket_max_frame_octets) + ) + + [] -> + Mediasync.HTTPErrors.send_not_found(conn) + end + end + + match _ do + Mediasync.HTTPErrors.send_not_found(conn) + end + + @impl Plug.ErrorHandler + def handle_errors(conn, %{kind: kind, reason: reason, stack: _stack}) do + IO.inspect({kind, reason}) + + case {kind, reason} do + {:error, %Plug.CSRFProtection.InvalidCSRFTokenError{}} -> + Mediasync.HTTPErrors.send_invalid_csrf_token(conn) + + _ -> + Mediasync.HTTPErrors.send_unknown(conn) + end + end +end diff --git a/lib/mediasync/templates.ex b/lib/mediasync/templates.ex new file mode 100644 index 0000000..fb91aa0 --- /dev/null +++ b/lib/mediasync/templates.ex @@ -0,0 +1,5 @@ +defmodule Mediasync.Templates do + require EEx + EEx.function_from_file(:def, :home, "priv/home.html.eex") + EEx.function_from_file(:def, :room, "priv/room.html.eex", [:video_url]) +end diff --git a/lib/mediasync/user_token.ex b/lib/mediasync/user_token.ex new file mode 100644 index 0000000..edeed0c --- /dev/null +++ b/lib/mediasync/user_token.ex @@ -0,0 +1,58 @@ +defmodule Mediasync.UserToken do + import Plug.Conn + + @type hash() :: binary() + + @user_token_session_key "user_token" + + @spec ensure_user_token(Plug.Conn.t()) :: Plug.Conn.t() + @spec ensure_user_token(Plug.Conn.t(), []) :: Plug.Conn.t() + def ensure_user_token(conn, _opts \\ []) do + if get_user_token(conn) do + conn + else + put_session( + conn, + @user_token_session_key, + Base.encode64(:crypto.strong_rand_bytes(16), padding: false) + ) + end + end + + @spec get_user_token_hash(Plug.Conn.t()) :: hash() | nil + @doc """ + Retrieves a hash of the user token from the session. + + This hash, not the original token, is suitable to be stored server-side (note + that the actual underlying token is what must be sent by the client). Using + these hashes for comparison prevents timing attacks, and it is OK for them + to be leaked in logs (such as when a GenServer exits and its state is + dumped). + """ + def get_user_token_hash(conn) do + case get_user_token(conn) do + nil -> nil + user_token -> :crypto.hash(:sha256, user_token) + end + end + + @spec get_user_token_hash!(Plug.Conn.t()) :: hash() + @doc """ + Like `get_user_token_hash/1`, but raises if there is no + user token in the session. + """ + def get_user_token_hash!(conn) do + user_token_hash = get_user_token_hash(conn) + + if user_token_hash do + user_token_hash + else + raise "No user token in session! Use ensure_user_token first." + end + end + + @spec get_user_token(Plug.Conn.t()) :: binary() | nil + defp get_user_token(conn) do + get_session(conn, @user_token_session_key) + end +end diff --git a/lib/mediasync/utils.ex b/lib/mediasync/utils.ex new file mode 100644 index 0000000..fa0ab68 --- /dev/null +++ b/lib/mediasync/utils.ex @@ -0,0 +1,66 @@ +defmodule Mediasync.Utils do + import Plug.Conn + + @spec put_html_content_type(Plug.Conn.t()) :: Plug.Conn.t() + @spec put_html_content_type(Plug.Conn.t(), []) :: Plug.Conn.t() + def put_html_content_type(conn, _opts \\ []) do + put_resp_content_type(conn, "text/html") + end + + @spec put_json_content_type(Plug.Conn.t()) :: Plug.Conn.t() + @spec put_json_content_type(Plug.Conn.t(), []) :: Plug.Conn.t() + def put_json_content_type(conn, _opts \\ []) do + put_resp_content_type(conn, "application/json") + end + + @spec redirect(Plug.Conn.t(), status: Plug.Conn.status(), location: binary()) :: Plug.Conn.t() + def redirect(conn, status: status, location: location) do + conn + |> put_resp_header("Location", location) + |> send_resp(status, "Redirecting to #{location}") + end + + @spec put_secret_key_base(Plug.Conn.t()) :: Plug.Conn.t() + @spec put_secret_key_base(Plug.Conn.t(), []) :: Plug.Conn.t() + def put_secret_key_base(conn, _opts \\ []) do + put_in(conn.secret_key_base, Application.fetch_env!(:mediasync, :secret_key_base)) + end + + @spec get_session_encryption_salt() :: binary() + def get_session_encryption_salt do + Application.fetch_env!(:mediasync, :session_encryption_salt) + end + + @spec get_session_signing_salt() :: binary() + def get_session_signing_salt do + Application.fetch_env!(:mediasync, :session_signing_salt) + end + + @spec bool_to_int_repr(boolean()) :: 0 | 1 + @doc """ + Convert false to 0 and true to 1. Useful for sending boolean values over binary protocols. + + Inverse of `int_repr_to_bool/1`. + """ + def bool_to_int_repr(bool) do + case bool do + false -> 0 + true -> 1 + end + end + + @spec int_repr_to_bool!(0 | 1) :: boolean() + @doc """ + Convert 0 to false and 1 to true. Useful for receiving boolean values over binary protocols. + Raises `ArgumentError` if given an argument other than 0 or 1. + + Inverse of `bool_to_int_repr/1`. + """ + def int_repr_to_bool!(int_repr) do + case int_repr do + 0 -> false + 1 -> true + _ -> raise ArgumentError + end + end +end diff --git a/lib/mix/tasks/vendor.ex b/lib/mix/tasks/vendor.ex new file mode 100644 index 0000000..1f64236 --- /dev/null +++ b/lib/mix/tasks/vendor.ex @@ -0,0 +1,11 @@ +defmodule Mix.Tasks.Vendor do + use Mix.Task + + @impl Mix.Task + def run([]) do + {_, 0} = System.cmd("npm", ~w(install)) + File.cp_r!("node_modules/video.js/dist", "priv/static/video.js") + File.cp_r!("node_modules/video.js/LICENSE", "priv/static/video.js/LICENSE") + nil + end +end |