Compare commits

...

3 Commits

6 changed files with 233 additions and 91 deletions

View File

@ -102,6 +102,20 @@ defmodule Akkoma2org.ActivityStream do
Action.changeset(action, attrs)
end
def latest_action(), do: latest_actions(1) |> Enum.at(0)
@doc """
Returns the most-recent Action based on the publish date
"""
def latest_actions(limit) do
query =
from a in Akkoma2org.ActivityStream.Action,
order_by: [desc: a.published],
limit: ^limit
Repo.all(query)
end
@doc """
Searchs our actions based on a simple query.
@ -119,93 +133,17 @@ defmodule Akkoma2org.ActivityStream do
|> Repo.all()
end
def post_source(from) when is_binary(from), do: from
def post_source(from) when is_map(from), do: Map.get(from, "content")
def delete_dupes() do
l = list_actions |> Enum.map(&Map.get(&1, :external_id))
@doc "Convert a map with a serialized AS2 Activity in it to our internal format"
def munge_as2(as2_map) do
obj = Map.get(as2_map, "object")
type = as2_map |> Map.get("type")
case type do
"Create" ->
post_src = obj |> Map.get("source") |> post_source
%{
external_id: obj |> Map.get("id"),
action_type: type,
actor: as2_map |> Map.get("actor"),
dest: obj |> Map.get("to") |> Enum.at(0),
published: obj |> Map.get("published"),
type: obj |> Map.get("type"),
summary: obj |> Map.get("summary"),
content: obj |> Map.get("content"),
post_source: post_src,
full_action: Jason.encode!(as2_map)
}
"Announce" ->
IO.warn("Announcements not supported")
%{}
end
end
def ingest_one_as2(item) do
case item
|> Akkoma2org.ActivityStream.munge_as2()
|> Akkoma2org.ActivityStream.create_action() do
{:ok, _result} -> IO.puts("inserted")
{:error, _} -> IO.warn("failed")
end
end
def ingest_file(path) do
with {:ok, handle} <- :zip.zip_open(to_charlist(path), [:memory]),
{:ok, {'outbox.json', res}} = :zip.zip_get("outbox.json", handle) do
res
|> Jason.decode!()
|> Map.get("orderedItems")
|> Enum.map(&ingest_one_as2(&1))
:zip.zip_close(handle)
end
end
def ingest_api(user: user, password: pass) do
app =
try do
Hunter.Application.load_credentials("akkoma2org")
rescue
FileError ->
Hunter.create_app("akkoma2org", "urn:ietf:wg:oauth:2.0:oob", ["read"], nil, save?: true)
end
conn = Hunter.log_in(app, user, pass, "https://notes.whatthefuck.computer")
my_id = Hunter.verify_credentials(conn) |> Map.get(:id)
# iterate over statuses
# accumulator is next_id + collected statuses in a proplist
statuses = Hunter.statuses(conn, my_id)
last_id = statuses |> Enum.at(-1) |> Map.get(:id)
posts =
Stream.unfold(
%{next: last_id, posts: statuses},
fn
%{next: nil, posts: collection} ->
nil
%{next: max_id, posts: collection} ->
statuses = Hunter.statuses(conn, my_id, max_id: max_id)
last_id = statuses |> Enum.at(-1) |> Map.get(:id)
all = collection ++ statuses
# IO.puts(:stdio, all |> Enum.map(&Map.get(&1, :id)) |> Enum.join(" "))
IO.puts(:stdio, "Fetched #{Enum.count(all)} so far...")
{last_id, %{next: last_id, posts: all}}
end
Enum.uniq(l -- Enum.uniq(l))
|> Enum.map(
&Akkoma2org.Repo.all(
from a in Akkoma2org.ActivityStream.Action, where: a.external_id == ^&1
)
posts |> Enum.count() |> IO.inspect()
)
|> Enum.map(&(its = Enum.slice(&1, 1..-1)))
|> List.flatten()
|> Enum.map(&Akkoma2org.ActivityStream.delete_action(&1))
end
end

View File

@ -34,12 +34,13 @@ defmodule Akkoma2org.ActivityStream.Action do
:action_type,
:external_id
])
|> unique_constraint(:external_id)
|> validate_required([
:actor,
:dest,
:published,
:type,
:action_type,
# :type,
# :action_type,
:external_id
])
end

View File

@ -0,0 +1,150 @@
defmodule Akkoma2org.ActivityStream.APIBackfill do
alias Akkoma2org.ActivityStream
alias Akkoma2org.ActivityStream.Action
# https://stackoverflow.com/questions/47354778/elixir-map-with-mixed-keys
def key_to_atom(map) do
Enum.reduce(map, %{}, fn
{key, value}, acc when is_atom(key) -> Map.put(acc, key, value)
# String.to_existing_atom saves us from overloading the VM by
# creating too many atoms. It'll always succeed because all the fields
# in the database already exist as atoms at runtime.
# lol haha yes
{key, value}, acc when is_binary(key) -> Map.put(acc, String.to_atom(key), value)
end)
end
def munge_mastodon(%Hunter.Status{reblog: status} = _orig_status) when is_map(status) do
# nested Status has these come in as dicts... same w/ akkoma+pleroma extensions i made hm.
acct_w_atoms =
status.account
|> key_to_atom()
acct = Map.merge(%Hunter.Account{}, acct_w_atoms)
%{status | account: acct} |> munge_mastodon()
end
def munge_mastodon(%Hunter.Status{reblog: nil} = status) do
{:ok, full_action} = Poison.encode(status)
post_source =
case status.akkoma["source"] do
str when is_binary(str) ->
str
%{"content" => content} ->
content
_ ->
""
end
# url
%{
external_id: status.id,
# || status.account[:url],
actor: status.account.url,
content: status.content,
post_source: post_source,
dest: status.pleroma["context"],
summary: status.spoiler_text,
published: status.created_at,
full_action: full_action
}
end
def unfold(conn, account_id, post_id, :backward),
do: unfold1(conn, account_id, post_id, :max_id)
def unfold(conn, account_id, post_id, :forward),
do: unfold1(conn, account_id, post_id, :since_id)
defp unfold1(conn, account_id, last_id, dir_key) do
# iterate over statuses
# accumulator is next_id + collected statuses in a proplist
Stream.unfold(
{last_id, 0},
fn
# # early exit
# {_, prev_count} when prev_count > 100 ->
# nil
{last_id, prev_count} ->
statuses = Hunter.statuses(conn, account_id, [{dir_key, last_id}])
case statuses do
[] ->
nil
_ ->
last_id = statuses |> Enum.at(-1) |> Map.get(:id)
# IO.puts(:stdio, all |> Enum.map(&Map.get(&1, :id)) |> Enum.join(" "))
count = Enum.count(statuses) + prev_count
IO.puts("Fetched #{count} now... up to #{last_id}")
{statuses, {last_id, count}}
end
end
)
end
def fetch_latest(user: user, password: pass) do
{_app, conn, my_id} = initialize(user, pass)
%Action{external_id: last_id} = ActivityStream.latest_action()
unfold(conn, my_id, last_id, :forward)
|> Enum.to_list()
|> List.flatten()
|> Enum.map(fn it ->
case it
|> munge_mastodon()
|> Akkoma2org.ActivityStream.create_action() do
{:ok, cs} -> cs
{:error, cs} -> IO.inspect(cs)
end
end)
end
def backfill(user: user, password: pass) do
{_app, conn, my_id} = initialize(user, pass)
statuses = Hunter.statuses(conn, my_id)
last_id = statuses |> Enum.at(-1) |> Map.get(:id)
posts =
unfold(conn, my_id, last_id, :backward)
|> Enum.to_list()
all_posts = (statuses ++ posts) |> List.flatten()
all_posts
|> Enum.map(fn it ->
case it
|> munge_mastodon()
|> Akkoma2org.ActivityStream.create_action() do
{:ok, cs} -> cs
{:error, cs} -> IO.inspect(cs)
end
end)
end
defp initialize(user, pass) do
app =
try do
Hunter.Application.load_credentials("akkoma2org")
rescue
FileError ->
Hunter.create_app("akkoma2org", "urn:ietf:wg:oauth:2.0:oob", ["read"], nil, save?: true)
end
conn = Hunter.log_in(app, user, pass, "https://notes.whatthefuck.computer")
my_id = Hunter.verify_credentials(conn) |> Map.get(:id)
{app, conn, my_id}
end
end

View File

@ -0,0 +1,53 @@
defmodule Akkoma2org.ActivityStream.FileBackfill do
def post_source(from) when is_binary(from), do: from
def post_source(from) when is_map(from), do: Map.get(from, "content")
@doc "Convert a map with a serialized AS2 Activity in it to our internal format"
def munge_as2(as2_map) do
obj = Map.get(as2_map, "object")
type = as2_map |> Map.get("type")
case type do
"Create" ->
post_src = obj |> Map.get("source") |> post_source
%{
external_id: obj |> Map.get("id"),
action_type: type,
actor: as2_map |> Map.get("actor"),
dest: obj |> Map.get("to") |> Enum.at(0),
published: obj |> Map.get("published"),
type: obj |> Map.get("type"),
summary: obj |> Map.get("summary"),
content: obj |> Map.get("content"),
post_source: post_src,
full_action: Jason.encode!(as2_map)
}
"Announce" ->
IO.warn("Announcements not supported")
%{}
end
end
def ingest_one_as2(item) do
case item
|> Akkoma2org.ActivityStream.munge_as2()
|> Akkoma2org.ActivityStream.create_action() do
{:ok, _result} -> IO.puts("inserted")
{:error, _} -> IO.warn("failed")
end
end
def ingest_file(path) do
with {:ok, handle} <- :zip.zip_open(to_charlist(path), [:memory]),
{:ok, {'outbox.json', res}} = :zip.zip_get("outbox.json", handle) do
res
|> Jason.decode!()
|> Map.get("orderedItems")
|> Enum.map(&ingest_one_as2(&1))
:zip.zip_close(handle)
end
end
end

View File

@ -1,5 +1,5 @@
defmodule Mix.Tasks.Akkoma2org.ApiIngest do
@moduledoc "The ingest mix task: `mix help ingex`"
@moduledoc "The ingest mix task: `mix help api_ingest`"
use Mix.Task
@shortdoc "Ingest from an API"
@ -29,6 +29,6 @@ defmodule Mix.Tasks.Akkoma2org.ApiIngest do
pass
end
Akkoma2org.ActivityStream.ingest_api(user: user, password: pass)
Akkoma2org.ActivityStream.APIBackfill.backfill(user: user, password: pass)
end
end

View File

@ -67,7 +67,7 @@ defmodule Akkoma2org.MixProject do
"ecto.reset": [
"ecto.drop",
"ecto.setup",
"ingest '/home/rrix/Downloads/archive-rrix-20230914T172758-Z1QD3_2KpjRtmGAKaKfKyV65HIqBMXlojmInh1QbgOY.zip'"
"akkoma2org.api_ingest"
],
test: ["ecto.create --quiet", "ecto.migrate --quiet", "test"],
"assets.setup": ["tailwind.install --if-missing", "esbuild.install --if-missing"],