187 lines
5.9 KiB
Elixir

defmodule Bright.Socials.XPost do
use Ecto.Schema
import Ecto.{Changeset, Query}
alias Bright.Repo
alias Bright.Vtubers.Vtuber
alias Bright.Socials.{XPost, RSSParser}
alias Bright.Platforms.Platform
alias Bright.Platforms
alias Quinn
require Logger
@doc """
We cache the posts in the db so it's clear which tweets we've read and which ones we haven't.
The idea is to process only uncached posts.
"""
schema "x_posts" do
field :raw, :string
field :url, :string
field :date, :utc_datetime
field :processed_at, :utc_datetime
belongs_to :stream, Stream
belongs_to :vtuber, Bright.Vtubers.Vtuber
timestamps(type: :utc_datetime)
end
@doc false
def changeset(post, attrs) do
post
|> cast(attrs, [:raw, :url, :date, :vtuber_id, :processed_at])
|> validate_required([:raw, :url, :date, :vtuber_id])
|> unique_constraint(:date)
|> unique_constraint(:url)
end
@doc """
Get all X posts available in the vtuber's rss feed
"""
def get_new_posts(%Vtuber{display_name: display_name, twitter_rss: twitter_rss}) do
Logger.debug(
"get_new_posts was called with vtuber struct. twitter_rss=#{inspect(twitter_rss)}"
)
case twitter_rss do
nil ->
{:warning, "#{display_name} doesn't have a twitter_rss URL."}
_ ->
get_new_posts(twitter_rss)
end
end
def get_new_posts(feed_url) do
case HTTPoison.get(feed_url) do
{:ok, %HTTPoison.Response{body: body}} ->
data = Quinn.parse(body)
extract = RSSParser.extract_item_details(data)
Logger.debug("we GETted a rss feed. Parsed data=#{inspect(data)}")
Logger.debug("we parsed the rss feed using RSSParser. parsed=#{inspect(extract)}")
{:ok, extract}
{:error, reason} ->
Logger.debug("failed to get_posts. reason=#{inspect(reason)}")
{:error, reason}
end
end
def authored_by_vtuber?(x_post, vtuber) do
vtuber_path = URI.parse(vtuber.twitter).path
post_path = URI.parse(x_post.url).path
Logger.debug("vtuber_path=#{inspect(vtuber_path)} post_path=#{inspect(post_path)}")
String.starts_with?(post_path, vtuber_path)
end
def get_unprocessed_posts() do
XPost
|> where([p], is_nil(p.processed_at))
|> Repo.all()
|> Repo.preload(:vtuber)
end
def extract_hostname(url) do
uri = URI.parse(url)
uri.host || ""
end
def includes_alias?(%XPost{raw: raw}, platform), do: includes_alias?(raw, platform)
def includes_alias?(raw, platform) do
case Map.get(platform, :platform_aliases, []) do
[] -> false
aliases -> Enum.any?(aliases, fn alias -> raw =~ extract_hostname(alias.url) end)
end
end
@doc """
Checks if the given raw text or XPost includes a reference to the specified platform.
The function checks for matches against the platform's hostname or its aliases.
## Parameters
- raw_text: The raw text or XPost to search within.
- platform: The %Platform{} struct containing the URL and aliases to match against.
## Returns
- `true` if the platform's hostname or any of its aliases are found in the raw text.
- `false` otherwise.
"""
def includes_platform?(%XPost{raw: raw}, platform) do
includes_platform?(raw, platform)
end
def includes_platform?(raw_text, %Platform{url: url} = platform)
when is_binary(raw_text) and is_binary(url) do
hostname_match = raw_text =~ extract_hostname(url)
alias_match = includes_alias?(raw_text, platform)
hostname_match || alias_match
end
def includes_platform?(_, _), do: false
def get_platforms_mentioned(%XPost{raw: raw}, [%Platform{} = platforms]) do
get_platforms_mentioned(raw, platforms)
end
def get_platforms_mentioned(raw_text, platforms) do
Enum.filter(platforms, &includes_platform?(raw_text, &1))
end
@doc """
Is the post a valid NSFW livestream announcement?
Eligibility requirements.
To be considered a NSFW live announcement, a post must satisfy all the following conditions.
* The post is authored by the lewdtuber
* The post does not contain, "VOD/i"
* The post mentions a NSFW platform
* The post does not mention any SFW streaming platform.
"""
def is_nsfw_live_announcement?(
%XPost{vtuber: vtuber} = post,
mentioned_platforms,
known_platforms
) do
Logger.debug("Checking if post is NSFW live announcement: #{inspect(post)}")
nsfw_platforms = Enum.filter(known_platforms, & &1.nsfw)
sfw_platforms = Enum.reject(known_platforms, & &1.nsfw)
conditions = [
{:not_rt, XPost.authored_by_vtuber?(post, vtuber)},
{:not_vod, not String.contains?(String.downcase(post.raw), "vod")},
{:contains_nsfw_link, Platforms.contains_platform?(mentioned_platforms, nsfw_platforms)},
{:no_sfw_link, not Platforms.contains_platform?(mentioned_platforms, sfw_platforms)}
]
Enum.reduce_while(conditions, true, fn {label, condition}, _acc ->
if condition do
{:cont, true}
else
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
Logger.debug(">>> NSFW announcement check failed at: #{label}")
{:halt, false}
end
end)
end
end
defimpl Phoenix.HTML.Safe, for: Bright.Socials.XPost do
def to_iodata(x_post) do
Phoenix.HTML.Safe.to_iodata("#{x_post.raw} -- #{x_post.url} -- #{x_post.date}")
end
end