fp/apps/bright/lib/bright/streams.ex
CJ_Clippy deb5ef397e
Some checks failed
ci / build (push) Failing after 5m16s
ci / Tests & Checks (push) Failing after 10m38s
act_runner succeeds omg
2025-02-12 13:09:16 -08:00

554 lines
12 KiB
Elixir

defmodule Bright.Streams do
@moduledoc """
The Streams context.
"""
require Logger
import Ecto.Query, warn: false
alias Bright.Repo
alias Bright.Streams.{Stream, Vod}
alias Bright.Vtubers.Vtuber
alias Bright.Tags.Tag
alias Bright.Platforms.Platform
alias Bright.{
Cache,
Downloader,
Storage,
Events
}
@pubsub Bright.PubSub
@doc """
Returns the list of streams.
## Examples
iex> list_streams()
[%Stream{}, ...]
"""
def list_streams do
Stream
|> Repo.all()
|> Repo.preload([:tags, :vods, :vtubers, :platforms])
end
@doc """
Gets a single stream.
Raises `Ecto.NoResultsError` if the Stream does not exist.
## Examples
iex> get_stream!(123)
%Stream{}
iex> get_stream!(456)
** (Ecto.NoResultsError)
"""
def get_stream!(id) do
Stream
|> Repo.get!(id)
|> Repo.preload([:tags, :vods, :vtubers, :platforms])
end
@doc """
Creates a stream.
## Examples
iex> create_stream(%{field: value})
{:ok, %Stream{}}
iex> create_stream(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_stream(attrs \\ %{}) do
%Stream{}
# |> Stream.changeset(attrs)
|> change_stream(attrs)
|> Repo.insert()
end
@doc """
Updates a stream.
## Examples
iex> update_stream(stream, %{field: new_value})
{:ok, %Stream{}}
iex> update_stream(stream, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_stream(%Stream{} = stream, attrs) do
stream
|> change_stream(attrs)
|> Repo.update()
end
@doc """
Deletes a stream.
## Examples
iex> delete_stream(stream)
{:ok, %Stream{}}
iex> delete_stream(stream)
{:error, %Ecto.Changeset{}}
"""
def delete_stream(%Stream{} = stream) do
Repo.delete(stream)
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking stream changes.
## Examples
iex> change_stream(stream)
%Ecto.Changeset{data: %Stream{}}
"""
def change_stream(%Stream{} = stream, attrs \\ %{}) do
tags = list_tags_by_id(attrs["tag_ids"])
vods = list_vods_by_id(attrs["vod_ids"])
vtubers = list_vtubers_by_id(attrs["vtuber_ids"])
platforms = list_platforms_by_id(attrs["platform_ids"])
stream
|> Repo.preload([:tags, :vods, :vtubers])
|> Stream.changeset(attrs)
|> Ecto.Changeset.put_assoc(:tags, tags)
|> Ecto.Changeset.put_assoc(:vods, vods)
|> Ecto.Changeset.put_assoc(:vtubers, vtubers)
|> Ecto.Changeset.put_assoc(:platforms, platforms)
end
def list_tags_by_id(nil), do: []
def list_tags_by_id(tag_ids) do
Repo.all(from t in Tag, where: t.id in ^tag_ids)
end
def list_vods_by_id(nil), do: []
def list_vods_by_id(vod_ids) do
Repo.all(from v in Vod, where: v.id in ^vod_ids)
end
def list_vtubers_by_id(nil), do: []
def list_vtubers_by_id(vtuber_ids) do
Repo.all(from v in Vtuber, where: v.id in ^vtuber_ids)
end
def list_platforms_by_id(nil), do: []
def list_platforms_by_id(platform_ids) do
Repo.all(from p in Platform, where: p.id in ^platform_ids)
end
alias Bright.Streams.Vod
@doc """
Returns the list of vods.
## Examples
iex> list_vods()
[%Vod{}, ...]
"""
def list_vods do
Vod
|> Repo.all()
|> Repo.preload([:torrent, :stream])
end
@doc """
Returns the most recently updated vod
"""
def most_recently_updated_vod do
Vod
|> order_by([v], desc: v.updated_at)
|> limit(1)
|> Repo.one()
end
@doc """
Gets a single vod.
Raises `Ecto.NoResultsError` if the Vod does not exist.
## Examples
iex> get_vod!(123)
%Vod{}
iex> get_vod!(456)
** (Ecto.NoResultsError)
"""
def get_vod!(id) do
Vod
|> Repo.get!(id)
|> Repo.preload([:torrent, :stream])
end
@doc """
Creates a vod.
## Examples
iex> create_vod(%{field: value})
{:ok, %Vod{}}
iex> create_vod(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_vod(attrs \\ %{}) do
%Vod{}
|> Vod.changeset(attrs)
|> Repo.insert()
|> case do
{:ok, %Vod{} = vod} ->
vod = Repo.preload(vod, [:torrent, :stream])
Oban.insert!(Bright.ObanWorkers.ProcessVod.new(%{vod_id: vod.id}))
{:ok, vod}
{:error, changeset} ->
{:error, changeset}
end
end
def create_tag(attrs \\ %{}) do
%Tag{}
|> Tag.changeset(attrs)
|> Repo.insert()
end
# def create_stream(attrs \\ %{}) do
# %Stream{}
# # |> Stream.changeset(attrs)
# |> change_stream(attrs)
# |> Repo.insert()
# end
# defp enqueue_process_vod(%Vod{id: id} = vod) do
# %{vod_id: id}
# |> Bright.ObanWorkers.ProcessVod.new()
# |> Oban.insert()
# vod
# end
@doc """
Updates a vod.
## Examples
iex> update_vod(vod, %{field: new_value})
{:ok, %Vod{}}
iex> update_vod(vod, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_vod(%Vod{} = vod, attrs) do
vod
|> Repo.preload([:stream, :torrent])
|> Vod.changeset(attrs)
|> Repo.update()
end
@doc """
Deletes a vod.
## Examples
iex> delete_vod(vod)
{:ok, %Vod{}}
iex> delete_vod(vod)
{:error, %Ecto.Changeset{}}
"""
def delete_vod(%Vod{} = vod) do
Repo.delete(vod)
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking vod changes.
## Examples
iex> change_vod(vod)
%Ecto.Changeset{data: %Vod{}}
"""
def change_vod(%Vod{} = vod, attrs \\ %{}) do
Vod.changeset(vod, attrs)
end
def transmux_to_hls(%Vod{} = vod, cb) do
if !vod.origin_temp_input_url, do: raise("vod was missing origin_temp_input_url")
local_path = Cache.generate_filename(vod.origin_temp_input_url)
Downloader.download!(vod.origin_temp_input_url, local_path)
Logger.debug(
"transmuxing to hls using origin_temp_input_url=#{vod.origin_temp_input_url}, local_path=#{local_path}"
)
master_pl_name = "master.m3u8"
dir_name = "vod-#{vod.id}"
dir = Path.join(Bright.Cache.cache_dir(), dir_name)
File.mkdir_p!(dir)
cb.(%{stage: :transmuxing, done: 1, total: 1})
# @see https://www.mux.com/articles/how-to-convert-mp4-to-hls-format-with-ffmpeg-a-step-by-step-guide#when-to-use-hls-over-mp4-formats-whats-the-difference
# ffmpeg -i input_video.mp4 \
# -filter_complex \
# "[0:v]split=3[v1][v2][v3]; \
# [v1]scale=w=1920:h=1080[v1out]; \
# [v2]scale=w=1280:h=720[v2out]; \
# [v3]scale=w=854:h=480[v3out]" \
# -map "[v1out]" -c:v:0 libx264 -b:v:0 5000k -maxrate:v:0 5350k -bufsize:v:0 7500k \
# -map "[v2out]" -c:v:1 libx264 -b:v:1 2800k -maxrate:v:1 2996k -bufsize:v:1 4200k \
# -map "[v3out]" -c:v:2 libx264 -b:v:2 1400k -maxrate:v:2 1498k -bufsize:v:2 2100k \
# -map a:0 -c:a aac -b:a:0 192k -ac 2 \
# -map a:0 -c:a aac -b:a:1 128k -ac 2 \
# -map a:0 -c:a aac -b:a:2 96k -ac 2 \
# -f hls \
# -hls_time 10 \
# -hls_playlist_type vod \
# -hls_flags independent_segments \
# -hls_segment_type mpegts \
# -hls_segment_filename stream_%v/data%03d.ts \
# -master_pl_name master.m3u8 \
# -var_stream_map "v:0,a:0 v:1,a:1 v:2,a:2" \
# stream_%v/playlist.m3u8
System.cmd("ffmpeg", [
"-i",
local_path,
"-filter_complex",
"[0:v]split=5[v1][v2][v3][v4][v5];" <>
"[v1]scale=w=1920:h=1080[v1out];" <>
"[v2]scale=w=1280:h=720[v2out];" <>
"[v3]scale=w=854:h=480[v3out];" <>
"[v4]scale=w=640:h=360[v4out];" <>
"[v5]scale=w=284:h=160[v5out]",
# Video streams
"-map",
"[v1out]",
"-c:v:0",
"libx264",
"-b:v:0",
"5000k",
"-maxrate:v:0",
"5350k",
"-bufsize:v:0",
"7500k",
"-map",
"[v2out]",
"-c:v:1",
"libx264",
"-b:v:1",
"2800k",
"-maxrate:v:1",
"2996k",
"-bufsize:v:1",
"4200k",
"-map",
"[v3out]",
"-c:v:2",
"libx264",
"-b:v:2",
"1400k",
"-maxrate:v:2",
"1498k",
"-bufsize:v:2",
"2100k",
"-map",
"[v4out]",
"-c:v:3",
"libx264",
"-b:v:3",
"800k",
"-maxrate:v:3",
"856k",
"-bufsize:v:3",
"1200k",
"-map",
"[v5out]",
"-c:v:4",
"libx264",
"-b:v:4",
"300k",
"-maxrate:v:4",
"300k",
"-bufsize:v:4",
"480k",
# Audio streams
"-map",
"a:0",
"-c:a:0",
"aac",
"-b:a:0",
"192k",
"-ac:a:0",
"2",
"-map",
"a:0",
"-c:a:1",
"aac",
"-b:a:1",
"192k",
"-ac:a:1",
"2",
"-map",
"a:0",
"-c:a:2",
"aac",
"-b:a:2",
"192k",
"-ac:a:2",
"2",
"-map",
"a:0",
"-c:a:3",
"aac",
"-b:a:3",
"164k",
"-ac:a:3",
"2",
"-map",
"a:0",
"-c:a:4",
"aac",
"-b:a:4",
"164k",
"-ac:a:4",
"2",
"-f",
"hls",
"-hls_time",
"2",
"-hls_playlist_type",
"vod",
"-hls_flags",
"independent_segments",
"-hls_segment_type",
"mpegts",
"-start_number",
"0",
"-hls_list_size",
"0",
"-hls_segment_filename",
"#{dir}/stream_%v_segment_%d.ts",
"-master_pl_name",
master_pl_name,
"-var_stream_map",
"v:0,a:0 v:1,a:1 v:2,a:2 v:3,a:3 v:4,a:4",
"#{dir}/stream_%v.m3u8"
])
files = Path.wildcard("#{dir}/*")
files
|> Elixir.Stream.map(fn hls_local_path ->
cb.(%{stage: :persisting, done: 1, total: length(files)})
hls_local_path
end)
|> Enum.each(fn hls_local_path ->
Storage.upload_from_filename(
hls_local_path,
"package/vod-#{vod.id}/#{Path.basename(hls_local_path)}",
cb,
content_type:
if(String.ends_with?(hls_local_path, ".m3u8"),
do: "application/x-mpegURL",
else: "video/mp4"
)
)
end)
playlist_url = "#{Bright.config([:s3_cdn_endpoint])}/package/vod-#{vod.id}/master.m3u8"
Logger.debug("playlist_url=#{playlist_url} local_path=#{local_path}")
hls_vod =
update_vod(vod, %{
playlist_url: playlist_url,
local_path: local_path
})
Logger.debug(inspect(hls_vod))
cb.(%{stage: :generating_thumbnail, done: 1, total: 1})
# {:ok, hls_vod} = store_thumbnail_from_file(hls_vod, vod.local_path)
# @TODO should probably keep the file cached locally for awhile for any additional processing
# File.rm!(hls_vod.local_path)
hls_vod
end
# def store_thumbnail_from_file(%Vod{} = vod, src_path, marker \\ %{minutes: 0}, opts \\ []) do
# with {:ok, thumbnail} <- create_thumbnail_from_file(vod, src_path, marker, opts),
# {:ok, %{key: key, cdn_url: cdn_url}} <- B2.put(thumbnail, thumbnail_filename(vod)) do
# {:ok, vod_thumbnail} =
# Vod
# |> change_vod(%{
# thumbnail_url: thumbnail_filename(vod)
# })
# |> Repo.insert(on_conflict: :nothing)
# end
# end
# defp create_thumbnail_from_file(%Vod{} = vod, src_path, marker, opts \\ []) do
# dst_path = Path.join(System.tmp_dir!(), "#{vod.id}-#{marker.minutes}.jpeg")
# if not File.exists?(dst_path) do
# :ok = Thumbnex.create_thumbnail(src_path, dst_path, opts)
# end
# File.read(dst_path)
# end
# @todo IDK how to use Phoenix pubsub
defp broadcast!(topic, msg) do
Phoenix.PubSub.broadcast!(@pubsub, topic, {__MODULE__, msg})
end
def broadcast_processing_progressed!(stage, vod, pct) do
broadcast!("backend", %Events.ProcessingProgressed{vod: vod, stage: stage, pct: pct})
end
def broadcast_processing_completed!(action, vod, url) do
broadcast!("backend", %Events.ProcessingCompleted{action: action, vod: vod, url: url})
end
def broadcast_processing_failed!(vod, attempt, max_attempts) do
broadcast!("backend", %Events.ProcessingFailed{
vod: vod,
attempt: attempt,
max_attempts: max_attempts
})
end
end