CreateHlsPlaylist, CreateS3Asset, and CreateThumbnail all work

This commit is contained in:
CJ_Clippy 2025-01-22 12:23:34 -08:00
parent db0222ff94
commit 2e4887a5a1
19 changed files with 229 additions and 95 deletions

View File

@ -36,6 +36,7 @@ defmodule Bright.B2 do
end
cdn_url = "#{s3_cdn_endpoint}/#{object_key}"
IO.puts "putting local_file=#{local_file} to bucket=#{bucket} s3_cdn_endpoint=#{s3_cdn_endpoint} key=#{object_key}"
local_file
|> S3.Upload.stream_file

View File

@ -38,9 +38,12 @@ defmodule Bright.Cache do
# |> Kernel.<>(".#{ext}")
end
def get_cache_dir do
@cache_dir
end
# Ensure the cache directory exists
defp ensure_cache_dir! do
def ensure_cache_dir! do
unless File.exists?(@cache_dir) do
File.mkdir_p!(@cache_dir)
end

View File

@ -3,31 +3,54 @@ defmodule Bright.Downloader do
Downloader functions
"""
def get(url) do
filename = Bright.Cache.generate_filename(url)
local_file = Bright.Cache.generate_filename(url)
headers = %{}
case HTTPoison.get(url, headers) do
{:ok, %HTTPoison.Response{status_code: 200, body: body}} ->
case File.write(local_file, body) do
:ok -> {:ok, local_file}
end
IO.puts("Downloader downloading to filename=#{filename}")
{:ok, %HTTPoison.Response{status_code: status, body: body}} ->
{:error, %{status: status, body: body}}
{:error, %HTTPoison.Error{reason: reason}} ->
{:error, reason}
failed ->
Logger.error("Failed to GET #{url}")
{:error, :failed}
try do
{download!(url, filename), filename}
rescue
exception ->
{:error, Exception.message(exception)}
end
end
# greets https://elixirforum.com/t/how-to-download-big-files/9173/4
defp download!(file_url, filename) do
file =
if File.exists?(filename) do
File.open!(filename, [:append])
else
File.touch!(filename)
File.open!(filename, [:append])
end
%HTTPoison.AsyncResponse{id: ref} = HTTPoison.get!(file_url, %{}, stream_to: self())
append_loop(ref, file)
end
defp append_loop(ref, file) do
receive do
%HTTPoison.AsyncChunk{chunk: chunk, id: ^ref} ->
IO.binwrite(file, chunk)
append_loop(ref, file)
%HTTPoison.AsyncEnd{id: ^ref} ->
File.close(file)
# need something to handle errors like request timeout and such
# otherwise it will loop forever
# don't know what httpoison returns in case of an error ...
# you can inspect `_other` below to find out
# and match on the error to exit the loop early
other ->
IO.puts(
"!!!!!!!!!!!!!! WARNING: append_loop is doing it's recursive thing which may cause an infinite loop. @todo please implement error handling. other=#{inspect(other)}"
)
append_loop(ref, file)
end
end
end

View File

@ -2,7 +2,7 @@
defmodule Bright.ObanWorkers.CreateHlsPlaylist do
use Oban.Worker, queue: :default, max_attempts: 1
use Oban.Worker, queue: :default, max_attempts: 6
alias Bright.Repo
alias Bright.Streams.Vod
@ -10,18 +10,18 @@ defmodule Bright.ObanWorkers.CreateHlsPlaylist do
require Logger
@auth_token Application.get_env(:bright, :superstreamer_auth_token)
@superstreamer_url Application.get_env(:bright, :superstreamer_url)
@superstreamer_url System.get_env("SUPERSTREAMER_URL")
@public_s3_endpoint Application.get_env(:bright, :s3_cdn_endpoint)
# args: %{"vod_id" => 10, "input_url" => "http://38.242.193.246:8081/fixtures/2024-12-19T03-10-30Z.ts"}
@impl Oban.Worker
def perform(%Oban.Job{args: %{"vod_id" => vod_id, "input_url" => input_url}}) do
Logger.info(">>>> create_hls_playlist is performing. input_url=#{input_url} vod_id=#{vod_id}")
def perform(%Oban.Job{args: %{"vod_id" => vod_id}}) do
Logger.info(">>>> create_hls_playlist is performing. vod_id=#{vod_id}")
vod = Repo.get!(Vod, vod_id)
payload = build_payload(input_url)
payload = build_payload(vod.origin_temp_input_url)
Logger.info("Starting transcoding for VOD ID #{vod_id}")
@ -43,12 +43,12 @@ defmodule Bright.ObanWorkers.CreateHlsPlaylist do
%{
"inputs" => [
%{"type" => "audio", "path" => input_url, "language" => "eng"},
# %{"type" => "video", "path" => input_url}
%{"type" => "video", "path" => input_url}
],
"streams" => [
# %{"type" => "video", "codec" => "h264", "height" => 1080},
# %{"type" => "video", "codec" => "h264", "height" => 720},
# %{"type" => "video", "codec" => "h264", "height" => 144},
%{"type" => "video", "codec" => "h264", "height" => 1080},
# %{"type" => "video", "codec" => "h264", "height" => 720}, # when I enabled this, I see a superstreamer error? -- "header 'content-length' is listed in signed headers, but is not present "
%{"type" => "video", "codec" => "h264", "height" => 144},
%{"type" => "audio", "codec" => "aac"}
],
"tag" => "create_hls_playlist"
@ -64,6 +64,8 @@ defmodule Bright.ObanWorkers.CreateHlsPlaylist do
Logger.info("auth headers as follows")
Logger.info(inspect(headers))
Logger.info("@superstreamer_url=#{@superstreamer_url}")
if is_nil(@superstreamer_url) do
Logger.error("The @superstreamer_url is nil. This must be set before proceeding.")

View File

@ -8,18 +8,21 @@ defmodule Bright.ObanWorkers.CreateS3Asset do
alias Bright.Downloader
alias Bright.B2
alias Bright.Repo
alias Bright.Streams.{
Stream,
Vod
}
@impl Oban.Worker
def perform(%Oban.Job{args: %{"input_url" => input_url, "vod_id" => vod_id}}) do
def perform(%Oban.Job{args: %{"vod_id" => vod_id}}) do
Logger.info("CreateS3Asset begin.")
vod = Repo.get!(Vod, vod_id)
basename = Cache.generate_basename(input_url)
with {:ok, local_file} <- Downloader.get(input_url),
{:ok, object_key} <- B2.put(local_file, basename) do
update_vod_with_s3_asset(vod, object_key)
with {:ok, local_file} <- Downloader.get(vod.origin_temp_input_url),
{:ok, %{key: key, cdn_url: cdn_url}} <- B2.put(local_file) do
update_vod_with_s3_asset(vod, cdn_url)
else
{:error, reason} ->
@ -28,21 +31,13 @@ defmodule Bright.ObanWorkers.CreateS3Asset do
end
end
defp update_vod_with_s3_asset(vod, object_key) do
basename = Path.basename(vod.thumbnail_url)
s3_cdn_url = B2.generate_cdn_url(object_key)
vod
|> Ecto.Changeset.change(s3_cdn_url: s3_cdn_url)
|> Repo.update!()
defp update_vod_with_s3_asset(vod, s3_cdn_url) do
case Repo.update(vod |> Ecto.Changeset.change(s3_cdn_url: s3_cdn_url)) do
{:ok, updated_vod} -> {:ok, updated_vod}
{:error, changeset} -> {:error, changeset}
end
end
# defp update_vod_with_playlist_url(vod, asset_id) do
# playlist_url = generate_playlist_url(asset_id)
# Logger.info("playlist_url=#{playlist_url}")
# vod
# |> Ecto.Changeset.change(playlist_url: playlist_url)
# |> Repo.update!()
# end
end

View File

@ -19,8 +19,8 @@ defmodule Bright.ObanWorkers.ProcessVod do
if vod.origin_temp_input_url do
unless vod.playlist_url, do: queue_create_hls_playlist(vod)
unless vod.s3_cdn_url, do: queue_create_s3_asset(vod)
unless vod.playlist_url, do: queue_create_hls_playlist(vod)
unless vod.thumbnail_url, do: queue_create_thumbnail(vod)
end
@ -29,18 +29,18 @@ defmodule Bright.ObanWorkers.ProcessVod do
end
defp queue_create_hls_playlist(%Vod{id: id, origin_temp_input_url: url}) do
job_args = %{vod_id: id, input_url: url}
defp queue_create_hls_playlist(%Vod{id: id}) do
job_args = %{vod_id: id}
Oban.insert!(CreateHlsPlaylist.new(job_args))
end
defp queue_create_s3_asset(%Vod{id: id, origin_temp_input_url: url}) do
job_args = %{vod_id: id, input_url: url}
defp queue_create_s3_asset(%Vod{id: id}) do
job_args = %{vod_id: id}
Oban.insert!(CreateS3Asset.new(job_args))
end
defp queue_create_thumbnail(%Vod{id: id, origin_temp_input_url: url}) do
job_args = %{vod_id: id, input_url: url}
defp queue_create_thumbnail(%Vod{id: id}) do
job_args = %{vod_id: id}
Oban.insert!(CreateThumbnail.new(job_args))
end

View File

@ -6,9 +6,6 @@ defmodule Bright.Streams.Vod do
field :origin_temp_input_url, :string
field :playlist_url, :string
field :s3_cdn_url, :string
field :s3_upload_id, :string
field :s3_key, :string
field :s3_bucket, :string
field :mux_asset_id, :string
field :mux_playback_id, :string
field :ipfs_cid, :string
@ -24,7 +21,7 @@ defmodule Bright.Streams.Vod do
@doc false
def changeset(vod, attrs) do
vod
|> cast(attrs, [:s3_cdn_url, :s3_upload_id, :s3_key, :s3_bucket, :mux_asset_id, :mux_playback_id, :ipfs_cid, :torrent, :stream_id, :origin_temp_input_url, :playlist_url, :thumbnail_url])
|> cast(attrs, [:s3_cdn_url, :mux_asset_id, :mux_playback_id, :ipfs_cid, :torrent, :stream_id, :origin_temp_input_url, :playlist_url, :thumbnail_url])
|> validate_required([:stream_id])
end

View File

@ -9,10 +9,7 @@
<.table id="vods" rows={@vods} row_click={&JS.navigate(~p"/vods/#{&1}")}>
<:col :let={vod} label="ID">{vod.id}</:col>
<:col :let={vod} label="S3 cdn url">{vod.s3_cdn_url}</:col>
<:col :let={vod} label="S3 upload">{vod.s3_upload_id}</:col>
<:col :let={vod} label="S3 key">{vod.s3_key}</:col>
<:col :let={vod} label="S3 bucket">{vod.s3_bucket}</:col>
<:col :let={vod} label="S3 CDN URL">{vod.s3_cdn_url}</:col>
<:col :let={vod} label="Mux asset">{vod.mux_asset_id}</:col>
<:col :let={vod} label="Mux playback">{vod.mux_playback_id}</:col>
<:col :let={vod} label="Ipfs cid">{vod.ipfs_cid}</:col>

View File

@ -46,15 +46,18 @@
</script>
<.list>
<:item title="Origin Temporary Input URL">{@vod.origin_temp_input_url}</:item>
<:item title="Source VOD File">
<%= if @vod.s3_cdn_url do %>
<a class="button is-secondary" href={@vod.s3_cdn_url} download={Path.basename(@vod.s3_cdn_url)}>
Download
</a>
<% end %>
</:item>
<:item title="Thumbnail URL"><img src={@vod.thumbnail_url} /></:item>
<:item title="HLS Playlist URL">{@vod.playlist_url}</:item>
<:item title="S3 CDN url">{@vod.s3_cdn_url}</:item>
<:item title="S3 upload">{@vod.s3_upload_id}</:item>
<:item title="S3 key">{@vod.s3_key}</:item>
<:item title="S3 bucket">{@vod.s3_bucket}</:item>
<:item title="Thumbnail URL">{@vod.thumbnail_url}</:item>
<:item title="Ipfs CID">{@vod.ipfs_cid}</:item>
<:item title="Torrent">{@vod.torrent}</:item>
<:item title="Ipfs CID">{@vod.ipfs_cid}</:item>
<%# <:item title="Origin Temporary Input URL">{@vod.origin_temp_input_url}</:item> %>
</.list>

View File

@ -5,9 +5,6 @@
<.input field={f[:origin_temp_input_url]} type="text" label="Import VOD from URL" />
<.input field={f[:playlist_url]} type="text" label="HLS Playlist URL" />
<.input field={f[:s3_cdn_url]} type="text" label="S3 cdn url" />
<.input field={f[:s3_upload_id]} type="text" label="S3 upload" />
<.input field={f[:s3_key]} type="text" label="S3 key" />
<.input field={f[:s3_bucket]} type="text" label="S3 bucket" />
<.input field={f[:mux_asset_id]} type="text" label="Mux asset" />
<.input field={f[:mux_playback_id]} type="text" label="Mux playback" />
<.input field={f[:ipfs_cid]} type="text" label="Ipfs cid" />

View File

@ -0,0 +1,11 @@
defmodule Bright.Repo.Migrations.RemoveRedundantS3FromVod do
use Ecto.Migration
def change do
alter table(:vods) do
remove :s3_upload_id
remove :s3_key
remove :s3_bucket
end
end
end

View File

@ -8,6 +8,7 @@ defmodule Bright.B2Test do
describe "B2" do
alias Bright.B2
alias Bright.Cache
@tag :acceptance
test "put/1" do
@ -19,7 +20,8 @@ defmodule Bright.B2Test do
@tag :acceptance
test "put/2" do
local_file = Path.absname("test/fixtures/SampleVideo_1280x720_1mb.mp4")
object_key = "test/SampleVideo_1280x720_1mb.mp4"
basename = Cache.generate_basename(local_file)
object_key = "test/#{basename}"
{:ok, %{key: key, cdn_url: cdn_url}} = B2.put(local_file, object_key)
assert Regex.match?(~r/SampleVideo/, key)
end

View File

@ -7,6 +7,10 @@ defmodule Bright.CacheTest do
describe "cache" do
@tag :unit
test "get_cache_dir/0" do
assert Regex.match?(~r/.cache\/futureporn/, Cache.get_cache_dir())
end
@tag :unit

View File

@ -4,10 +4,13 @@ defmodule Bright.DownloaderTest do
use Bright.DataCase
alias Bright.Downloader
alias Bright.Cache
@test_fixture "https://futureporn-b2.b-cdn.net/projekt-melody.jpg"
describe "downloader" do
@tag :integration
test "get/1" do
{:ok, local_file} = Downloader.get(@test_fixture)
@ -18,6 +21,38 @@ defmodule Bright.DownloaderTest do
end
@tag :integration
test "cache directory creation" do
cache_dir = Cache.get_cache_dir()
end
@tag :integration
test "downloading 1MB file" do
{:ok, local_file} = Downloader.get("https://futureporn-b2.b-cdn.net/SampleVideo_1280x720_1mb.mp4")
assert File.exists?(local_file)
{:ok, stat} = File.stat(local_file)
assert stat.size > 0, "File is empty"
end
@tag :integration
test "downloading 10MB file" do
{:ok, local_file} = Downloader.get("https://futureporn-b2.b-cdn.net/SampleVideo_1280x720_10mb.mp4")
assert File.exists?(local_file)
{:ok, stat} = File.stat(local_file)
assert stat.size > 0, "File is empty"
end
@tag :integration
test "downloading 20MB file" do
{:ok, local_file} = Downloader.get("https://futureporn-b2.b-cdn.net/SampleVideo_1280x720_20mb.mp4")
assert File.exists?(local_file)
{:ok, stat} = File.stat(local_file)
assert stat.size > 0, "File is empty"
end
end
end

View File

@ -42,6 +42,8 @@ defmodule Bright.CreateHlsPlaylistTest do
refute_enqueued worker: CreateHlsPlaylist
end
end

View File

@ -8,7 +8,10 @@ defmodule Bright.ObanWorkers.CreateS3AssetTest do
}
alias Bright.Cache
alias Bright.Streams
alias Bright.Streams.Stream
alias Bright.Streams.{
Stream,
Vod
}
# @tag :unit
# test "creating a new s3 asset (unit test)" do
@ -24,27 +27,44 @@ defmodule Bright.ObanWorkers.CreateS3AssetTest do
# end
@tag :integration
test "sheduling upon vod creation" do
example_video = "http://example.com/video.ts"
stream_attrs = %{date: ~U[2024-12-28 03:31:00Z], title: "some title", notes: "some notes"}
{:ok, %Stream{} = stream} = Streams.create_stream(stream_attrs)
{:ok, _vod} = Streams.create_vod(%{stream_id: stream.id, origin_temp_input_url: example_video})
assert_enqueued worker: ProcessVod, queue: :default
Oban.drain_queue(queue: :default) # ProcessVod is what queues CreateS3Asset so we need to make it run
assert_enqueued worker: CreateS3Asset, queue: :default
end
describe "CreateS3Asset" do
import Bright.StreamsFixtures
@example_url "https://futureporn-b2.b-cdn.net/big_buck_bunny_720p_1mb.mp4"
@tag :integration
test "not scheduled when origin_temp_input_url is missing" do
stream_attrs = %{date: ~U[2024-12-28 03:31:00Z], title: "some title", notes: "some notes"}
{:ok, %Stream{} = stream} = Streams.create_stream(stream_attrs)
{:ok, _vod} = Streams.create_vod(%{stream_id: stream.id})
@tag :integration
test "s3 asset creation" do
stream = stream_fixture()
vod = vod_fixture(%{s3_cdn_url: nil, stream_id: stream.id, origin_temp_input_url: @example_url})
{:ok, %Vod{} = vod} = perform_job(CreateS3Asset, %{vod_id: vod.id})
IO.puts "s3_cdn_url=#{vod.s3_cdn_url}"
assert Regex.match?(~r/^https:\/\/.*big_buck_bunny_720p_1mb\.mp4$/, vod.s3_cdn_url)
end
@tag :integration
test "sheduling upon vod creation" do
example_video = "http://example.com/video.ts"
stream_attrs = %{date: ~U[2024-12-28 03:31:00Z], title: "some title", notes: "some notes"}
{:ok, %Stream{} = stream} = Streams.create_stream(stream_attrs)
{:ok, _vod} = Streams.create_vod(%{stream_id: stream.id, origin_temp_input_url: example_video})
assert_enqueued worker: ProcessVod, queue: :default
Oban.drain_queue(queue: :default) # ProcessVod is what queues CreateS3Asset so we need to make it run
assert_enqueued worker: CreateS3Asset, queue: :default
end
@tag :integration
test "not scheduled when origin_temp_input_url is missing" do
stream_attrs = %{date: ~U[2024-12-28 03:31:00Z], title: "some title", notes: "some notes"}
{:ok, %Stream{} = stream} = Streams.create_stream(stream_attrs)
{:ok, _vod} = Streams.create_vod(%{stream_id: stream.id})
refute_enqueued worker: CreateS3Asset
end
refute_enqueued worker: CreateS3Asset
end
# @tag :acceptance

View File

@ -0,0 +1,42 @@
defmodule Bright.CreateTorrentTest do
use Bright.DataCase
use Oban.Testing, repo: Bright.Repo
require Logger
alias Bright.ObanWorkers.{ProcessVod, CreateTorrent}
alias Bright.Streams
alias Bright.Streams.Stream
describe "CreateTorrent" do
import Bright.StreamsFixtures
@tag :integration
test "sheduling upon vod creation" do
example_video = "http://example.com/video.ts"
stream_attrs = %{date: ~U[2024-12-28 03:31:00Z], title: "some title", notes: "some notes"}
{:ok, %Stream{} = stream} = Streams.create_stream(stream_attrs)
{:ok, _vod} = Streams.create_vod(%{stream_id: stream.id, origin_temp_input_url: example_video})
assert_enqueued worker: ProcessVod, queue: :default
assert %{success: 1} = Oban.drain_queue(queue: :default) # ProcessVod is what queues CreateThumbnail so we need to make it run
assert_enqueued [worker: CreateTorrent, queue: :default], 1000
end
@tag :integration
test "not scheduled when origin_temp_input_url is missing" do
stream_attrs = %{date: ~U[2024-12-28 03:31:00Z], title: "some title", notes: "some notes"}
{:ok, %Stream{} = stream} = Streams.create_stream(stream_attrs)
{:ok, _vod} = Streams.create_vod(%{stream_id: stream.id})
refute_enqueued worker: CreateTorrent
end
end
end

View File

@ -45,7 +45,7 @@ defmodule Bright.ObanWorkers.ProcessVodTest do
stream = stream_fixture()
vod_fixture(%{s3_cdn_url: nil, stream_id: stream.id, origin_temp_input_url: @example_url})
assert_enqueued worker: ProcessVod, queue: :default
assert %{success: 1} = Oban.drain_queue(queue: :default, with_safety: false) # ProcessVod is what queues CreateThumbnail so we need to make it run
assert %{success: 1} = Oban.drain_queue(queue: :default, with_safety: false) # ProcessVod is what queues CreateS3Asset so we need to make it run
assert_enqueued worker: CreateS3Asset, queue: :default
end
@ -55,7 +55,7 @@ defmodule Bright.ObanWorkers.ProcessVodTest do
vod_fixture(%{playlist_url: nil, stream_id: stream.id, origin_temp_input_url: @example_url})
assert_enqueued worker: ProcessVod, queue: :default
assert %{success: 1} = Oban.drain_queue(queue: :default, with_safety: false) # ProcessVod is what queues CreateThumbnail so we need to make it run
assert %{success: 1} = Oban.drain_queue(queue: :default, with_safety: false) # ProcessVod is what queues CreateHlsPlaylist so we need to make it run
assert_enqueued worker: CreateHlsPlaylist, queue: :default
end