553 lines
12 KiB
Elixir
553 lines
12 KiB
Elixir
defmodule Bright.Streams do
|
|
@moduledoc """
|
|
The Streams context.
|
|
"""
|
|
|
|
require Logger
|
|
import Ecto.Query, warn: false
|
|
alias Bright.Repo
|
|
|
|
alias Bright.Streams.{Stream, Vod}
|
|
alias Bright.Vtubers.Vtuber
|
|
alias Bright.Tags.Tag
|
|
alias Bright.Platforms.Platform
|
|
|
|
alias Bright.{
|
|
Cache,
|
|
Downloader,
|
|
Storage,
|
|
Events
|
|
}
|
|
|
|
@pubsub Bright.PubSub
|
|
|
|
@doc """
|
|
Returns the list of streams.
|
|
|
|
## Examples
|
|
|
|
iex> list_streams()
|
|
[%Stream{}, ...]
|
|
|
|
"""
|
|
def list_streams do
|
|
Stream
|
|
|> Repo.all()
|
|
|> Repo.preload([:tags, :vods, :vtubers, :platforms])
|
|
end
|
|
|
|
@doc """
|
|
Gets a single stream.
|
|
|
|
Raises `Ecto.NoResultsError` if the Stream does not exist.
|
|
|
|
## Examples
|
|
|
|
iex> get_stream!(123)
|
|
%Stream{}
|
|
|
|
iex> get_stream!(456)
|
|
** (Ecto.NoResultsError)
|
|
|
|
"""
|
|
def get_stream!(id) do
|
|
Stream
|
|
|> Repo.get!(id)
|
|
|> Repo.preload([:tags, :vods, :vtubers, :platforms])
|
|
end
|
|
|
|
@doc """
|
|
Creates a stream.
|
|
|
|
## Examples
|
|
|
|
iex> create_stream(%{field: value})
|
|
{:ok, %Stream{}}
|
|
|
|
iex> create_stream(%{field: bad_value})
|
|
{:error, %Ecto.Changeset{}}
|
|
|
|
"""
|
|
def create_stream(attrs \\ %{}) do
|
|
%Stream{}
|
|
# |> Stream.changeset(attrs)
|
|
|> change_stream(attrs)
|
|
|> Repo.insert()
|
|
end
|
|
|
|
@doc """
|
|
Updates a stream.
|
|
|
|
## Examples
|
|
|
|
iex> update_stream(stream, %{field: new_value})
|
|
{:ok, %Stream{}}
|
|
|
|
iex> update_stream(stream, %{field: bad_value})
|
|
{:error, %Ecto.Changeset{}}
|
|
|
|
"""
|
|
def update_stream(%Stream{} = stream, attrs) do
|
|
stream
|
|
|> change_stream(attrs)
|
|
|> Repo.update()
|
|
end
|
|
|
|
@doc """
|
|
Deletes a stream.
|
|
|
|
## Examples
|
|
|
|
iex> delete_stream(stream)
|
|
{:ok, %Stream{}}
|
|
|
|
iex> delete_stream(stream)
|
|
{:error, %Ecto.Changeset{}}
|
|
|
|
"""
|
|
def delete_stream(%Stream{} = stream) do
|
|
Repo.delete(stream)
|
|
end
|
|
|
|
@doc """
|
|
Returns an `%Ecto.Changeset{}` for tracking stream changes.
|
|
|
|
## Examples
|
|
|
|
iex> change_stream(stream)
|
|
%Ecto.Changeset{data: %Stream{}}
|
|
|
|
"""
|
|
def change_stream(%Stream{} = stream, attrs \\ %{}) do
|
|
tags = list_tags_by_id(attrs["tag_ids"])
|
|
vods = list_vods_by_id(attrs["vod_ids"])
|
|
vtubers = list_vtubers_by_id(attrs["vtuber_ids"])
|
|
platforms = list_platforms_by_id(attrs["platform_ids"])
|
|
|
|
stream
|
|
|> Repo.preload([:tags, :vods, :vtubers])
|
|
|> Stream.changeset(attrs)
|
|
|> Ecto.Changeset.put_assoc(:tags, tags)
|
|
|> Ecto.Changeset.put_assoc(:vods, vods)
|
|
|> Ecto.Changeset.put_assoc(:vtubers, vtubers)
|
|
|> Ecto.Changeset.put_assoc(:platforms, platforms)
|
|
end
|
|
|
|
def list_tags_by_id(nil), do: []
|
|
|
|
def list_tags_by_id(tag_ids) do
|
|
Repo.all(from t in Tag, where: t.id in ^tag_ids)
|
|
end
|
|
|
|
def list_vods_by_id(nil), do: []
|
|
|
|
def list_vods_by_id(vod_ids) do
|
|
Repo.all(from v in Vod, where: v.id in ^vod_ids)
|
|
end
|
|
|
|
def list_vtubers_by_id(nil), do: []
|
|
|
|
def list_vtubers_by_id(vtuber_ids) do
|
|
Repo.all(from v in Vtuber, where: v.id in ^vtuber_ids)
|
|
end
|
|
|
|
def list_platforms_by_id(nil), do: []
|
|
|
|
def list_platforms_by_id(platform_ids) do
|
|
Repo.all(from p in Platform, where: p.id in ^platform_ids)
|
|
end
|
|
|
|
alias Bright.Streams.Vod
|
|
|
|
@doc """
|
|
Returns the list of vods.
|
|
|
|
## Examples
|
|
|
|
iex> list_vods()
|
|
[%Vod{}, ...]
|
|
|
|
"""
|
|
def list_vods do
|
|
Vod
|
|
|> Repo.all()
|
|
|> Repo.preload(:torrent)
|
|
end
|
|
|
|
@doc """
|
|
Returns the most recently updated vod
|
|
"""
|
|
def most_recently_updated_vod do
|
|
Vod
|
|
|> order_by([v], desc: v.updated_at)
|
|
|> limit(1)
|
|
|> Repo.one()
|
|
end
|
|
|
|
@doc """
|
|
Gets a single vod.
|
|
|
|
Raises `Ecto.NoResultsError` if the Vod does not exist.
|
|
|
|
## Examples
|
|
|
|
iex> get_vod!(123)
|
|
%Vod{}
|
|
|
|
iex> get_vod!(456)
|
|
** (Ecto.NoResultsError)
|
|
|
|
"""
|
|
def get_vod!(id) do
|
|
Vod
|
|
|> Repo.get!(id)
|
|
|> Repo.preload(:torrent)
|
|
end
|
|
|
|
@doc """
|
|
Creates a vod.
|
|
|
|
## Examples
|
|
|
|
iex> create_vod(%{field: value})
|
|
{:ok, %Vod{}}
|
|
|
|
iex> create_vod(%{field: bad_value})
|
|
{:error, %Ecto.Changeset{}}
|
|
|
|
"""
|
|
def create_vod(attrs \\ %{}) do
|
|
%Vod{}
|
|
|> Vod.changeset(attrs)
|
|
|> Repo.insert()
|
|
|> case do
|
|
{:ok, %Vod{} = vod} ->
|
|
vod = Repo.preload(vod, [:torrent])
|
|
Oban.insert!(Bright.ObanWorkers.ProcessVod.new(%{vod_id: vod.id}))
|
|
{:ok, vod}
|
|
|
|
{:error, changeset} ->
|
|
{:error, changeset}
|
|
end
|
|
end
|
|
|
|
def create_tag(attrs \\ %{}) do
|
|
%Tag{}
|
|
|> Tag.changeset(attrs)
|
|
|> Repo.insert()
|
|
end
|
|
|
|
# def create_stream(attrs \\ %{}) do
|
|
# %Stream{}
|
|
# # |> Stream.changeset(attrs)
|
|
# |> change_stream(attrs)
|
|
# |> Repo.insert()
|
|
# end
|
|
|
|
# defp enqueue_process_vod(%Vod{id: id} = vod) do
|
|
# %{vod_id: id}
|
|
# |> Bright.ObanWorkers.ProcessVod.new()
|
|
# |> Oban.insert()
|
|
# vod
|
|
# end
|
|
|
|
@doc """
|
|
Updates a vod.
|
|
|
|
## Examples
|
|
|
|
iex> update_vod(vod, %{field: new_value})
|
|
{:ok, %Vod{}}
|
|
|
|
iex> update_vod(vod, %{field: bad_value})
|
|
{:error, %Ecto.Changeset{}}
|
|
|
|
"""
|
|
def update_vod(%Vod{} = vod, attrs) do
|
|
vod
|
|
|> Vod.changeset(attrs)
|
|
|> Repo.update()
|
|
end
|
|
|
|
@doc """
|
|
Deletes a vod.
|
|
|
|
## Examples
|
|
|
|
iex> delete_vod(vod)
|
|
{:ok, %Vod{}}
|
|
|
|
iex> delete_vod(vod)
|
|
{:error, %Ecto.Changeset{}}
|
|
|
|
"""
|
|
def delete_vod(%Vod{} = vod) do
|
|
Repo.delete(vod)
|
|
end
|
|
|
|
@doc """
|
|
Returns an `%Ecto.Changeset{}` for tracking vod changes.
|
|
|
|
## Examples
|
|
|
|
iex> change_vod(vod)
|
|
%Ecto.Changeset{data: %Vod{}}
|
|
|
|
"""
|
|
def change_vod(%Vod{} = vod, attrs \\ %{}) do
|
|
Vod.changeset(vod, attrs)
|
|
end
|
|
|
|
def transmux_to_hls(%Vod{} = vod, cb) do
|
|
if !vod.origin_temp_input_url, do: raise("vod was missing origin_temp_input_url")
|
|
|
|
local_path = Cache.generate_filename(vod.origin_temp_input_url)
|
|
Downloader.download!(vod.origin_temp_input_url, local_path)
|
|
|
|
Logger.debug(
|
|
"transmuxing to hls using origin_temp_input_url=#{vod.origin_temp_input_url}, local_path=#{local_path}"
|
|
)
|
|
|
|
master_pl_name = "master.m3u8"
|
|
|
|
dir_name = "vod-#{vod.id}"
|
|
dir = Path.join(Bright.Cache.cache_dir(), dir_name)
|
|
File.mkdir_p!(dir)
|
|
|
|
cb.(%{stage: :transmuxing, done: 1, total: 1})
|
|
|
|
# @see https://www.mux.com/articles/how-to-convert-mp4-to-hls-format-with-ffmpeg-a-step-by-step-guide#when-to-use-hls-over-mp4-formats-whats-the-difference
|
|
# ffmpeg -i input_video.mp4 \
|
|
# -filter_complex \
|
|
# "[0:v]split=3[v1][v2][v3]; \
|
|
# [v1]scale=w=1920:h=1080[v1out]; \
|
|
# [v2]scale=w=1280:h=720[v2out]; \
|
|
# [v3]scale=w=854:h=480[v3out]" \
|
|
# -map "[v1out]" -c:v:0 libx264 -b:v:0 5000k -maxrate:v:0 5350k -bufsize:v:0 7500k \
|
|
# -map "[v2out]" -c:v:1 libx264 -b:v:1 2800k -maxrate:v:1 2996k -bufsize:v:1 4200k \
|
|
# -map "[v3out]" -c:v:2 libx264 -b:v:2 1400k -maxrate:v:2 1498k -bufsize:v:2 2100k \
|
|
# -map a:0 -c:a aac -b:a:0 192k -ac 2 \
|
|
# -map a:0 -c:a aac -b:a:1 128k -ac 2 \
|
|
# -map a:0 -c:a aac -b:a:2 96k -ac 2 \
|
|
# -f hls \
|
|
# -hls_time 10 \
|
|
# -hls_playlist_type vod \
|
|
# -hls_flags independent_segments \
|
|
# -hls_segment_type mpegts \
|
|
# -hls_segment_filename stream_%v/data%03d.ts \
|
|
# -master_pl_name master.m3u8 \
|
|
# -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2" \
|
|
# stream_%v/playlist.m3u8
|
|
|
|
System.cmd("ffmpeg", [
|
|
"-i",
|
|
local_path,
|
|
"-filter_complex",
|
|
"[0:v]split=5[v1][v2][v3][v4][v5];" <>
|
|
"[v1]scale=w=1920:h=1080[v1out];" <>
|
|
"[v2]scale=w=1280:h=720[v2out];" <>
|
|
"[v3]scale=w=854:h=480[v3out];" <>
|
|
"[v4]scale=w=640:h=360[v4out];" <>
|
|
"[v5]scale=w=284:h=160[v5out]",
|
|
|
|
# Video streams
|
|
"-map",
|
|
"[v1out]",
|
|
"-c:v:0",
|
|
"libx264",
|
|
"-b:v:0",
|
|
"5000k",
|
|
"-maxrate:v:0",
|
|
"5350k",
|
|
"-bufsize:v:0",
|
|
"7500k",
|
|
"-map",
|
|
"[v2out]",
|
|
"-c:v:1",
|
|
"libx264",
|
|
"-b:v:1",
|
|
"2800k",
|
|
"-maxrate:v:1",
|
|
"2996k",
|
|
"-bufsize:v:1",
|
|
"4200k",
|
|
"-map",
|
|
"[v3out]",
|
|
"-c:v:2",
|
|
"libx264",
|
|
"-b:v:2",
|
|
"1400k",
|
|
"-maxrate:v:2",
|
|
"1498k",
|
|
"-bufsize:v:2",
|
|
"2100k",
|
|
"-map",
|
|
"[v4out]",
|
|
"-c:v:3",
|
|
"libx264",
|
|
"-b:v:3",
|
|
"800k",
|
|
"-maxrate:v:3",
|
|
"856k",
|
|
"-bufsize:v:3",
|
|
"1200k",
|
|
"-map",
|
|
"[v5out]",
|
|
"-c:v:4",
|
|
"libx264",
|
|
"-b:v:4",
|
|
"300k",
|
|
"-maxrate:v:4",
|
|
"300k",
|
|
"-bufsize:v:4",
|
|
"480k",
|
|
|
|
# Audio streams
|
|
"-map",
|
|
"a:0",
|
|
"-c:a:0",
|
|
"aac",
|
|
"-b:a:0",
|
|
"192k",
|
|
"-ac:a:0",
|
|
"2",
|
|
"-map",
|
|
"a:0",
|
|
"-c:a:1",
|
|
"aac",
|
|
"-b:a:1",
|
|
"192k",
|
|
"-ac:a:1",
|
|
"2",
|
|
"-map",
|
|
"a:0",
|
|
"-c:a:2",
|
|
"aac",
|
|
"-b:a:2",
|
|
"192k",
|
|
"-ac:a:2",
|
|
"2",
|
|
"-map",
|
|
"a:0",
|
|
"-c:a:3",
|
|
"aac",
|
|
"-b:a:3",
|
|
"164k",
|
|
"-ac:a:3",
|
|
"2",
|
|
"-map",
|
|
"a:0",
|
|
"-c:a:4",
|
|
"aac",
|
|
"-b:a:4",
|
|
"164k",
|
|
"-ac:a:4",
|
|
"2",
|
|
"-f",
|
|
"hls",
|
|
"-hls_time",
|
|
"2",
|
|
"-hls_playlist_type",
|
|
"vod",
|
|
"-hls_flags",
|
|
"independent_segments",
|
|
"-hls_segment_type",
|
|
"mpegts",
|
|
"-start_number",
|
|
"0",
|
|
"-hls_list_size",
|
|
"0",
|
|
"-hls_segment_filename",
|
|
"#{dir}/stream_%v_segment_%d.ts",
|
|
"-master_pl_name",
|
|
master_pl_name,
|
|
"-var_stream_map",
|
|
"v:0,a:0 v:1,a:1 v:2,a:2 v:3,a:3 v:4,a:4",
|
|
"#{dir}/stream_%v.m3u8"
|
|
])
|
|
|
|
files = Path.wildcard("#{dir}/*")
|
|
|
|
files
|
|
|> Elixir.Stream.map(fn hls_local_path ->
|
|
cb.(%{stage: :persisting, done: 1, total: length(files)})
|
|
hls_local_path
|
|
end)
|
|
|> Enum.each(fn hls_local_path ->
|
|
Storage.upload_from_filename(
|
|
hls_local_path,
|
|
"package/vod-#{vod.id}/#{Path.basename(hls_local_path)}",
|
|
cb,
|
|
content_type:
|
|
if(String.ends_with?(hls_local_path, ".m3u8"),
|
|
do: "application/x-mpegURL",
|
|
else: "video/mp4"
|
|
)
|
|
)
|
|
end)
|
|
|
|
playlist_url = "#{Bright.config([:s3_cdn_endpoint])}/package/vod-#{vod.id}/master.m3u8"
|
|
Logger.debug("playlist_url=#{playlist_url} local_path=#{local_path}")
|
|
|
|
hls_vod =
|
|
update_vod(vod, %{
|
|
playlist_url: playlist_url,
|
|
local_path: local_path
|
|
})
|
|
|
|
Logger.debug(inspect(hls_vod))
|
|
|
|
cb.(%{stage: :generating_thumbnail, done: 1, total: 1})
|
|
# {:ok, hls_vod} = store_thumbnail_from_file(hls_vod, vod.local_path)
|
|
|
|
# @TODO should probably keep the file cached locally for awhile for any additional processing
|
|
# File.rm!(hls_vod.local_path)
|
|
|
|
hls_vod
|
|
end
|
|
|
|
# def store_thumbnail_from_file(%Vod{} = vod, src_path, marker \\ %{minutes: 0}, opts \\ []) do
|
|
# with {:ok, thumbnail} <- create_thumbnail_from_file(vod, src_path, marker, opts),
|
|
# {:ok, %{key: key, cdn_url: cdn_url}} <- B2.put(thumbnail, thumbnail_filename(vod)) do
|
|
# {:ok, vod_thumbnail} =
|
|
# Vod
|
|
# |> change_vod(%{
|
|
# thumbnail_url: thumbnail_filename(vod)
|
|
# })
|
|
# |> Repo.insert(on_conflict: :nothing)
|
|
|
|
# end
|
|
# end
|
|
|
|
# defp create_thumbnail_from_file(%Vod{} = vod, src_path, marker, opts \\ []) do
|
|
# dst_path = Path.join(System.tmp_dir!(), "#{vod.id}-#{marker.minutes}.jpeg")
|
|
|
|
# if not File.exists?(dst_path) do
|
|
# :ok = Thumbnex.create_thumbnail(src_path, dst_path, opts)
|
|
# end
|
|
|
|
# File.read(dst_path)
|
|
# end
|
|
|
|
# @todo IDK how to use Phoenix pubsub
|
|
defp broadcast!(topic, msg) do
|
|
Phoenix.PubSub.broadcast!(@pubsub, topic, {__MODULE__, msg})
|
|
end
|
|
|
|
def broadcast_processing_progressed!(stage, vod, pct) do
|
|
broadcast!("backend", %Events.ProcessingProgressed{vod: vod, stage: stage, pct: pct})
|
|
end
|
|
|
|
def broadcast_processing_completed!(action, vod, url) do
|
|
broadcast!("backend", %Events.ProcessingCompleted{action: action, vod: vod, url: url})
|
|
end
|
|
|
|
def broadcast_processing_failed!(vod, attempt, max_attempts) do
|
|
broadcast!("backend", %Events.ProcessingFailed{
|
|
vod: vod,
|
|
attempt: attempt,
|
|
max_attempts: max_attempts
|
|
})
|
|
end
|
|
end
|