+defmodule Mediasync do
+ @moduledoc """
+ Documentation for `Mediasync`.
+ """
+ @doc """
+ Hello world.
+ ## Examples
+ iex> Mediasync.hello()
+ :world
+ """
+ def hello do
+ :world
+ end
+defmodule Mediasync.Application do
+ # See https://hexdocs.pm/elixir/Application.html
+ # for more information on OTP Applications
+ @moduledoc false
+ use Application
+ @impl true
+ def start(_type, _args) do
+ children = [
+ # Starts a worker by calling: Mediasync.Worker.start_link(arg)
+ # {Mediasync.Worker, arg}
+ {Bandit,
+ plug: Mediasync.Router,
+ scheme: :http,
+ ip: {127, 0, 0, 1},
+ port: Application.get_env(:mediasync, :port)},
+ {DynamicSupervisor,
+ max_children: Application.get_env(:mediasync, :max_rooms),
+ max_restarts: 0,
+ strategy: :one_for_one,
+ name: Mediasync.RoomSupervisor},
+ {Registry, keys: :unique, name: Mediasync.RoomRegistry},
+ {Registry, keys: :unique, name: Mediasync.RoomConnectionRegistry},
+ {Registry, keys: :duplicate, name: Mediasync.RoomSubscriptionRegistry}
+ ]
+ System.no_halt(true)
+ # See https://hexdocs.pm/elixir/Supervisor.html
+ # for other strategies and supported options
+ opts = [strategy: :one_for_one, name: Mediasync.Supervisor]
+ Supervisor.start_link(children, opts)
+ end
+defmodule Mediasync.HTTPErrors do
+ import Plug.Conn
+ import Mediasync.Utils
+ @video_url_too_large Jason.encode!(
+ %{
+ "error" => "video_url_too_large",
+ "max_size" => Application.compile_env(:mediasync, :max_video_url_size)
+ },
+ pretty: true
+ )
+ @spec send_video_url_too_large(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec send_video_url_too_large(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def send_video_url_too_large(conn, _opts \\ []) do
+ conn
+ |> put_json_content_type()
+ |> send_resp(
+ 400,
+ @video_url_too_large
+ )
+ end
+ @invalid_video_url Jason.encode!(
+ %{
+ "error" => "invalid_video_url"
+ },
+ pretty: true
+ )
+ @spec send_invalid_video_url(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec send_invalid_video_url(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def send_invalid_video_url(conn, _opts \\ []) do
+ conn
+ |> put_json_content_type()
+ |> send_resp(
+ 400,
+ @invalid_video_url
+ )
+ end
+ @not_found Jason.encode!(
+ %{
+ "error" => "not_found",
+ "message" => "No page was found at this location."
+ },
+ pretty: true
+ )
+ @spec send_not_found(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec send_not_found(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def send_not_found(conn, _opts \\ []) do
+ conn
+ |> put_json_content_type()
+ |> send_resp(
+ 404,
+ @not_found
+ )
+ end
+ @invalid_csrf_token Jason.encode!(
+ %{
+ "error" => "invalid_csrf_token",
+ "message" => "Try reloading the previous page and retrying."
+ },
+ pretty: true
+ )
+ @spec send_invalid_csrf_token(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec send_invalid_csrf_token(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def send_invalid_csrf_token(conn, _opts \\ []) do
+ conn
+ |> put_json_content_type()
+ |> send_resp(400, @invalid_csrf_token)
+ end
+ @unknown Jason.encode!(
+ %{
+ "error" => "unknown",
+ "message" =>
+ "Something went wrong. Consider reloading the previous page and retrying your action."
+ },
+ pretty: true
+ )
+ @spec send_unknown(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec send_unknown(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def send_unknown(conn, _opts \\ []) do
+ conn
+ |> put_json_content_type()
+ |> send_resp(500, @unknown)
+ end
+defmodule Mediasync.PlaybackState do
+ @enforce_keys [:paused?, :position_milliseconds]
+ defstruct @enforce_keys
+ @type t() :: %Mediasync.PlaybackState{
+ paused?: boolean(),
+ position_milliseconds: integer()
+ }
+defmodule Mediasync.Room.State do
+ @enforce_keys [:video_url, :host_user_token_hash]
+ @host_disconnected_tries_max 5 * 6
+ defstruct [
+ :video_url,
+ :host_user_token_hash,
+ :room_id,
+ host_disconnected_tries: @host_disconnected_tries_max
+ ]
+ def host_disconnected_tries_max do
+ @host_disconnected_tries_max
+ end
+ @type t() :: %Mediasync.Room.State{
+ video_url: binary(),
+ host_user_token_hash: Mediasync.UserToken.hash(),
+ room_id: Mediasync.RoomID.t(),
+ host_disconnected_tries: integer()
+ }
+defmodule Mediasync.Room do
+ use GenServer
+ @spec start_link(Mediasync.Room.State.t()) :: tuple()
+ def start_link(state = %Mediasync.Room.State{}) do
+ state =
+ case state.room_id do
+ nil -> %{state | room_id: Mediasync.RoomID.generate()}
+ _ -> state
+ end
+ Tuple.append(
+ GenServer.start_link(__MODULE__, state,
+ name: {:via, Registry, {Mediasync.RoomRegistry, state.room_id}}
+ ),
+ state.room_id
+ )
+ end
+ @spec get_video_url(GenServer.server()) :: binary()
+ def get_video_url(pid) do
+ GenServer.call(pid, :get_video_url)
+ end
+ @spec host?(GenServer.server(), Mediasync.UserToken.hash()) :: boolean()
+ def host?(pid, user_token_hash) do
+ GenServer.call(pid, {:host?, user_token_hash})
+ end
+ @spec publish_playback_state(GenServer.server(), Mediasync.PlaybackState.t()) :: :ok
+ def publish_playback_state(pid, playback_state = %Mediasync.PlaybackState{}) do
+ GenServer.call(pid, {:publish_playback_state, playback_state})
+ end
+ @inactive_check_wait_milliseconds 10 * 1000
+ @impl true
+ @spec init(Mediasync.Room.State.t()) :: {:ok, Mediasync.Room.State.t()}
+ def init(room_state = %Mediasync.Room.State{}) do
+ Process.send_after(self(), :check_if_active, @inactive_check_wait_milliseconds)
+ {:ok, room_state}
+ end
+ @impl true
+ def handle_call(:get_video_url, _from, state = %Mediasync.Room.State{}) do
+ {:reply, state.video_url, state}
+ end
+ @impl true
+ def handle_call({:host?, user_token_hash}, _from, state = %Mediasync.Room.State{}) do
+ host_user_token_hash = state.host_user_token_hash
+ case user_token_hash do
+ ^host_user_token_hash -> {:reply, true, state}
+ _ -> {:reply, false, state}
+ end
+ end
+ @impl true
+ def handle_call(
+ {:publish_playback_state, playback_state = %Mediasync.PlaybackState{}},
+ _from,
+ state = %Mediasync.Room.State{}
+ ) do
+ Registry.dispatch(Mediasync.RoomSubscriptionRegistry, state.room_id, fn entries ->
+ for {pid, _} <- entries, do: send(pid, {:playback_state_update, playback_state})
+ end)
+ {:reply, :ok, state}
+ end
+ @impl true
+ def handle_info(:check_if_active, state) do
+ state =
+ case Registry.lookup(
+ Mediasync.RoomConnectionRegistry,
+ {state.room_id, state.host_user_token_hash}
+ ) do
+ [{_pid, _value}] ->
+ %{state | host_disconnected_tries: Mediasync.Room.State.host_disconnected_tries_max()}
+ _ ->
+ %{state | host_disconnected_tries: state.host_disconnected_tries - 1}
+ end
+ Process.send_after(self(), :check_if_active, @inactive_check_wait_milliseconds)
+ if state.host_disconnected_tries <= 0 do
+ {:stop, :no_host, state}
+ else
+ {:noreply, state}
+ end
+ end
+defmodule Mediasync.RoomConnection.State do
+ @enforce_keys [:room_pid, :room_id, :user_token_hash]
+ defstruct [
+ :room_pid,
+ :room_id,
+ :user_token_hash,
+ :room_monitor_ref,
+ :host?,
+ initialized?: false
+ ]
+ @type t() :: %Mediasync.RoomConnection.State{
+ room_pid: pid(),
+ room_id: Mediasync.RoomID.t(),
+ user_token_hash: Mediasync.UserToken.hash(),
+ room_monitor_ref: reference() | nil,
+ host?: boolean() | nil,
+ initialized?: boolean()
+ }
+defmodule Mediasync.RoomConnection do
+ import Mediasync.Utils, only: [bool_to_int_repr: 1, int_repr_to_bool!: 1]
+ @behaviour WebSock
+ @spec init(Mediasync.RoomConnection.State.t()) :: {:ok, Mediasync.RoomConnection.State.t()}
+ @impl true
+ def init(state = %Mediasync.RoomConnection.State{}) do
+ state = %{state | room_monitor_ref: Process.monitor(state.room_pid)}
+ case Registry.register(
+ Mediasync.RoomConnectionRegistry,
+ {state.room_id, state.user_token_hash},
+ nil
+ ) do
+ {:ok, _pid} ->
+ state = %{state | host?: Mediasync.Room.host?(state.room_pid, state.user_token_hash)}
+ unless state.host? do
+ Registry.register(Mediasync.RoomSubscriptionRegistry, state.room_id, nil)
+ end
+ {:ok, state}
+ _ ->
+ {:stop, {:error, :user_already_connected}, state}
+ end
+ end
+ @impl true
+ def handle_in(
+ {"i", opcode: :binary},
+ state = %Mediasync.RoomConnection.State{}
+ ) do
+ {:push,
+ [
+ {:binary, <<"i", bool_to_int_repr(state.host?)::8>>}
+ ], %{state | initialized?: true}}
+ end
+ @impl true
+ def handle_in(
+ {<<"s", paused?::8, position_milliseconds::64>>, opcode: :binary},
+ state = %Mediasync.RoomConnection.State{}
+ ) do
+ if state.host? do
+ Mediasync.Room.publish_playback_state(state.room_pid, %Mediasync.PlaybackState{
+ paused?: int_repr_to_bool!(paused?),
+ position_milliseconds: position_milliseconds
+ })
+ {:ok, state}
+ else
+ {:stop, {:error, :received_forbidden_state_update}, state}
+ end
+ end
+ @impl true
+ def handle_info(
+ {:playback_state_update, playback_state = %Mediasync.PlaybackState{}},
+ state = %Mediasync.RoomConnection.State{}
+ ) do
+ if state.initialized? do
+ {:push,
+ {:binary,
+ <<"s", bool_to_int_repr(playback_state.paused?)::8,
+ playback_state.position_milliseconds::64>>}, state}
+ else
+ {:ok, state}
+ end
+ end
+ @impl true
+ def handle_info(
+ {:DOWN, ref, :process, _object, _reason},
+ state = %Mediasync.RoomConnection.State{}
+ ) do
+ room_monitor_ref = state.room_monitor_ref
+ case ref do
+ ^room_monitor_ref -> {:stop, {:error, :room_exited}, state}
+ _ -> {:stop, {:error, :unexpected_down_message}, state}
+ end
+ end
+defmodule Mediasync.RoomID do
+ @type t() :: binary()
+ @spec generate() :: t()
+ def generate do
+ Application.get_env(:mediasync, :node_id) <>
+ "~" <> Base.url_encode64(:crypto.strong_rand_bytes(16), padding: false)
+ end
+defmodule Mediasync.Router do
+ import Mediasync.Utils
+ import Mediasync.UserToken
+ use Plug.Router
+ use Plug.ErrorHandler
+ plug(Plug.Logger)
+ plug(Plug.Head)
+ plug(Plug.Static, at: "/static", from: {:mediasync, "priv/static"})
+ plug(:put_secret_key_base)
+ plug(Plug.Session,
+ store: :cookie,
+ key: "_mediasync_session",
+ encryption_salt: {Mediasync.Utils, :get_session_encryption_salt, []},
+ signing_salt: {Mediasync.Utils, :get_session_signing_salt, []}
+ )
+ plug(:fetch_session)
+ plug(:ensure_user_token)
+ plug(:match)
+ plug(Plug.Parsers,
+ parsers: [:urlencoded],
+ pass: ["application/x-www-form-urlencoded"]
+ )
+ plug(Plug.CSRFProtection)
+ plug(:dispatch)
+ get "/" do
+ conn
+ |> put_html_content_type()
+ |> send_resp(200, Mediasync.Templates.home())
+ end
+ post "/host_room" do
+ video_url = conn.body_params["video_url"]
+ cond do
+ byte_size(video_url) > Application.get_env(:mediasync, :max_video_url_size) ->
+ Mediasync.HTTPErrors.send_video_url_too_large(conn)
+ elem(URI.new(video_url), 0) != :ok ->
+ Mediasync.HTTPErrors.send_invalid_video_url(conn)
+ true ->
+ {:ok, _pid, room_id} =
+ DynamicSupervisor.start_child(
+ Mediasync.RoomSupervisor,
+ {Mediasync.Room,
+ %Mediasync.Room.State{
+ video_url: video_url,
+ host_user_token_hash: get_user_token_hash!(conn)
+ }}
+ )
+ redirect(conn, status: 303, location: "/room/#{room_id}")
+ end
+ end
+ get "/room/:room_id" do
+ case Registry.lookup(Mediasync.RoomRegistry, conn.path_params["room_id"]) do
+ [{pid, _value}] ->
+ conn
+ |> put_html_content_type()
+ |> send_resp(200, Mediasync.Templates.room(Mediasync.Room.get_video_url(pid)))
+ [] ->
+ Mediasync.HTTPErrors.send_not_found(conn)
+ end
+ end
+ get "/room/:room_id/websocket" do
+ # TODO: verify origin before doing any of this
+ user_token_hash = get_user_token_hash!(conn)
+ room_id = conn.path_params["room_id"]
+ case Registry.lookup(Mediasync.RoomRegistry, room_id) do
+ [{pid, _value}] ->
+ conn
+ |> WebSockAdapter.upgrade(
+ Mediasync.RoomConnection,
+ %Mediasync.RoomConnection.State{
+ room_pid: pid,
+ room_id: room_id,
+ user_token_hash: user_token_hash
+ },
+ max_frame_size: Application.fetch_env!(:mediasync, :websocket_max_frame_octets)
+ )
+ [] ->
+ Mediasync.HTTPErrors.send_not_found(conn)
+ end
+ end
+ match _ do
+ Mediasync.HTTPErrors.send_not_found(conn)
+ end
+ @impl Plug.ErrorHandler
+ def handle_errors(conn, %{kind: kind, reason: reason, stack: _stack}) do
+ IO.inspect({kind, reason})
+ case {kind, reason} do
+ {:error, %Plug.CSRFProtection.InvalidCSRFTokenError{}} ->
+ Mediasync.HTTPErrors.send_invalid_csrf_token(conn)
+ _ ->
+ Mediasync.HTTPErrors.send_unknown(conn)
+ end
+ end
+defmodule Mediasync.Templates do
+ require EEx
+ EEx.function_from_file(:def, :home, "priv/home.html.eex")
+ EEx.function_from_file(:def, :room, "priv/room.html.eex", [:video_url])
+defmodule Mediasync.UserToken do
+ import Plug.Conn
+ @type hash() :: binary()
+ @user_token_session_key "user_token"
+ @spec ensure_user_token(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec ensure_user_token(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def ensure_user_token(conn, _opts \\ []) do
+ if get_user_token(conn) do
+ conn
+ else
+ put_session(
+ conn,
+ @user_token_session_key,
+ Base.encode64(:crypto.strong_rand_bytes(16), padding: false)
+ )
+ end
+ end
+ @spec get_user_token_hash(Plug.Conn.t()) :: hash() | nil
+ @doc """
+ Retrieves a hash of the user token from the session.
+ This hash, not the original token, is suitable to be stored server-side (note
+ that the actual underlying token is what must be sent by the client). Using
+ these hashes for comparison prevents timing attacks, and it is OK for them
+ to be leaked in logs (such as when a GenServer exits and its state is
+ dumped).
+ """
+ def get_user_token_hash(conn) do
+ case get_user_token(conn) do
+ nil -> nil
+ user_token -> :crypto.hash(:sha256, user_token)
+ end
+ end
+ @spec get_user_token_hash!(Plug.Conn.t()) :: hash()
+ @doc """
+ Like `get_user_token_hash/1`, but raises if there is no
+ user token in the session.
+ """
+ def get_user_token_hash!(conn) do
+ user_token_hash = get_user_token_hash(conn)
+ if user_token_hash do
+ user_token_hash
+ else
+ raise "No user token in session! Use ensure_user_token first."
+ end
+ end
+ @spec get_user_token(Plug.Conn.t()) :: binary() | nil
+ defp get_user_token(conn) do
+ get_session(conn, @user_token_session_key)
+ end
+defmodule Mediasync.Utils do
+ import Plug.Conn
+ @spec put_html_content_type(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec put_html_content_type(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def put_html_content_type(conn, _opts \\ []) do
+ put_resp_content_type(conn, "text/html")
+ end
+ @spec put_json_content_type(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec put_json_content_type(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def put_json_content_type(conn, _opts \\ []) do
+ put_resp_content_type(conn, "application/json")
+ end
+ @spec redirect(Plug.Conn.t(), status: Plug.Conn.status(), location: binary()) :: Plug.Conn.t()
+ def redirect(conn, status: status, location: location) do
+ conn
+ |> put_resp_header("Location", location)
+ |> send_resp(status, "Redirecting to #{location}")
+ end
+ @spec put_secret_key_base(Plug.Conn.t()) :: Plug.Conn.t()
+ @spec put_secret_key_base(Plug.Conn.t(), []) :: Plug.Conn.t()
+ def put_secret_key_base(conn, _opts \\ []) do
+ put_in(conn.secret_key_base, Application.fetch_env!(:mediasync, :secret_key_base))
+ end
+ @spec get_session_encryption_salt() :: binary()
+ def get_session_encryption_salt do
+ Application.fetch_env!(:mediasync, :session_encryption_salt)
+ end
+ @spec get_session_signing_salt() :: binary()
+ def get_session_signing_salt do
+ Application.fetch_env!(:mediasync, :session_signing_salt)
+ end
+ @spec bool_to_int_repr(boolean()) :: 0 | 1
+ @doc """
+ Convert false to 0 and true to 1. Useful for sending boolean values over binary protocols.
+ Inverse of `int_repr_to_bool/1`.
+ """
+ def bool_to_int_repr(bool) do
+ case bool do
+ false -> 0
+ true -> 1
+ end
+ end
+ @spec int_repr_to_bool!(0 | 1) :: boolean()
+ @doc """
+ Convert 0 to false and 1 to true. Useful for receiving boolean values over binary protocols.
+ Raises `ArgumentError` if given an argument other than 0 or 1.
+ Inverse of `bool_to_int_repr/1`.
+ """
+ def int_repr_to_bool!(int_repr) do
+ case int_repr do
+ 0 -> false
+ 1 -> true
+ _ -> raise ArgumentError
+ end
+ end
+defmodule Mix.Tasks.Vendor do
+ use Mix.Task
+ @impl Mix.Task
+ def run([]) do
+ {_, 0} = System.cmd("npm", ~w(install))
+ File.cp_r!("node_modules/video.js/dist", "priv/static/video.js")
+ File.cp_r!("node_modules/video.js/LICENSE", "priv/static/video.js/LICENSE")
+ nil
+ end