Compare commits
3 Commits
02070cbb3a
...
c4b693ce5f
Author | SHA1 | Date |
---|---|---|
Ryan Rix | c4b693ce5f | |
Ryan Rix | 2364b406c1 | |
Ryan Rix | 23237068b6 |
|
@ -102,6 +102,20 @@ defmodule Akkoma2org.ActivityStream do
|
|||
Action.changeset(action, attrs)
|
||||
end
|
||||
|
||||
def latest_action(), do: latest_actions(1) |> Enum.at(0)
|
||||
|
||||
@doc """
|
||||
Returns the most-recent Action based on the publish date
|
||||
"""
|
||||
def latest_actions(limit) do
|
||||
query =
|
||||
from a in Akkoma2org.ActivityStream.Action,
|
||||
order_by: [desc: a.published],
|
||||
limit: ^limit
|
||||
|
||||
Repo.all(query)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Searchs our actions based on a simple query.
|
||||
|
||||
|
@ -119,93 +133,17 @@ defmodule Akkoma2org.ActivityStream do
|
|||
|> Repo.all()
|
||||
end
|
||||
|
||||
def post_source(from) when is_binary(from), do: from
|
||||
def post_source(from) when is_map(from), do: Map.get(from, "content")
|
||||
def delete_dupes() do
|
||||
l = list_actions |> Enum.map(&Map.get(&1, :external_id))
|
||||
|
||||
@doc "Convert a map with a serialized AS2 Activity in it to our internal format"
|
||||
def munge_as2(as2_map) do
|
||||
obj = Map.get(as2_map, "object")
|
||||
type = as2_map |> Map.get("type")
|
||||
|
||||
case type do
|
||||
"Create" ->
|
||||
post_src = obj |> Map.get("source") |> post_source
|
||||
|
||||
%{
|
||||
external_id: obj |> Map.get("id"),
|
||||
action_type: type,
|
||||
actor: as2_map |> Map.get("actor"),
|
||||
dest: obj |> Map.get("to") |> Enum.at(0),
|
||||
published: obj |> Map.get("published"),
|
||||
type: obj |> Map.get("type"),
|
||||
summary: obj |> Map.get("summary"),
|
||||
content: obj |> Map.get("content"),
|
||||
post_source: post_src,
|
||||
full_action: Jason.encode!(as2_map)
|
||||
}
|
||||
|
||||
"Announce" ->
|
||||
IO.warn("Announcements not supported")
|
||||
%{}
|
||||
end
|
||||
end
|
||||
|
||||
def ingest_one_as2(item) do
|
||||
case item
|
||||
|> Akkoma2org.ActivityStream.munge_as2()
|
||||
|> Akkoma2org.ActivityStream.create_action() do
|
||||
{:ok, _result} -> IO.puts("inserted")
|
||||
{:error, _} -> IO.warn("failed")
|
||||
end
|
||||
end
|
||||
|
||||
def ingest_file(path) do
|
||||
with {:ok, handle} <- :zip.zip_open(to_charlist(path), [:memory]),
|
||||
{:ok, {'outbox.json', res}} = :zip.zip_get("outbox.json", handle) do
|
||||
res
|
||||
|> Jason.decode!()
|
||||
|> Map.get("orderedItems")
|
||||
|> Enum.map(&ingest_one_as2(&1))
|
||||
|
||||
:zip.zip_close(handle)
|
||||
end
|
||||
end
|
||||
|
||||
def ingest_api(user: user, password: pass) do
|
||||
app =
|
||||
try do
|
||||
Hunter.Application.load_credentials("akkoma2org")
|
||||
rescue
|
||||
FileError ->
|
||||
Hunter.create_app("akkoma2org", "urn:ietf:wg:oauth:2.0:oob", ["read"], nil, save?: true)
|
||||
end
|
||||
|
||||
conn = Hunter.log_in(app, user, pass, "https://notes.whatthefuck.computer")
|
||||
|
||||
my_id = Hunter.verify_credentials(conn) |> Map.get(:id)
|
||||
|
||||
# iterate over statuses
|
||||
# accumulator is next_id + collected statuses in a proplist
|
||||
statuses = Hunter.statuses(conn, my_id)
|
||||
last_id = statuses |> Enum.at(-1) |> Map.get(:id)
|
||||
|
||||
posts =
|
||||
Stream.unfold(
|
||||
%{next: last_id, posts: statuses},
|
||||
fn
|
||||
%{next: nil, posts: collection} ->
|
||||
nil
|
||||
|
||||
%{next: max_id, posts: collection} ->
|
||||
statuses = Hunter.statuses(conn, my_id, max_id: max_id)
|
||||
last_id = statuses |> Enum.at(-1) |> Map.get(:id)
|
||||
all = collection ++ statuses
|
||||
# IO.puts(:stdio, all |> Enum.map(&Map.get(&1, :id)) |> Enum.join(" "))
|
||||
IO.puts(:stdio, "Fetched #{Enum.count(all)} so far...")
|
||||
{last_id, %{next: last_id, posts: all}}
|
||||
end
|
||||
Enum.uniq(l -- Enum.uniq(l))
|
||||
|> Enum.map(
|
||||
&Akkoma2org.Repo.all(
|
||||
from a in Akkoma2org.ActivityStream.Action, where: a.external_id == ^&1
|
||||
)
|
||||
|
||||
posts |> Enum.count() |> IO.inspect()
|
||||
)
|
||||
|> Enum.map(&(its = Enum.slice(&1, 1..-1)))
|
||||
|> List.flatten()
|
||||
|> Enum.map(&Akkoma2org.ActivityStream.delete_action(&1))
|
||||
end
|
||||
end
|
||||
|
|
|
@ -34,12 +34,13 @@ defmodule Akkoma2org.ActivityStream.Action do
|
|||
:action_type,
|
||||
:external_id
|
||||
])
|
||||
|> unique_constraint(:external_id)
|
||||
|> validate_required([
|
||||
:actor,
|
||||
:dest,
|
||||
:published,
|
||||
:type,
|
||||
:action_type,
|
||||
# :type,
|
||||
# :action_type,
|
||||
:external_id
|
||||
])
|
||||
end
|
||||
|
|
|
@ -0,0 +1,150 @@
|
|||
defmodule Akkoma2org.ActivityStream.APIBackfill do
|
||||
alias Akkoma2org.ActivityStream
|
||||
alias Akkoma2org.ActivityStream.Action
|
||||
|
||||
# https://stackoverflow.com/questions/47354778/elixir-map-with-mixed-keys
|
||||
def key_to_atom(map) do
|
||||
Enum.reduce(map, %{}, fn
|
||||
{key, value}, acc when is_atom(key) -> Map.put(acc, key, value)
|
||||
# String.to_existing_atom saves us from overloading the VM by
|
||||
# creating too many atoms. It'll always succeed because all the fields
|
||||
# in the database already exist as atoms at runtime.
|
||||
# lol haha yes
|
||||
{key, value}, acc when is_binary(key) -> Map.put(acc, String.to_atom(key), value)
|
||||
end)
|
||||
end
|
||||
|
||||
def munge_mastodon(%Hunter.Status{reblog: status} = _orig_status) when is_map(status) do
|
||||
# nested Status has these come in as dicts... same w/ akkoma+pleroma extensions i made hm.
|
||||
acct_w_atoms =
|
||||
status.account
|
||||
|> key_to_atom()
|
||||
|
||||
acct = Map.merge(%Hunter.Account{}, acct_w_atoms)
|
||||
|
||||
%{status | account: acct} |> munge_mastodon()
|
||||
end
|
||||
|
||||
def munge_mastodon(%Hunter.Status{reblog: nil} = status) do
|
||||
{:ok, full_action} = Poison.encode(status)
|
||||
|
||||
post_source =
|
||||
case status.akkoma["source"] do
|
||||
str when is_binary(str) ->
|
||||
str
|
||||
|
||||
%{"content" => content} ->
|
||||
content
|
||||
|
||||
_ ->
|
||||
""
|
||||
end
|
||||
|
||||
# url
|
||||
%{
|
||||
external_id: status.id,
|
||||
# || status.account[:url],
|
||||
actor: status.account.url,
|
||||
content: status.content,
|
||||
post_source: post_source,
|
||||
dest: status.pleroma["context"],
|
||||
summary: status.spoiler_text,
|
||||
published: status.created_at,
|
||||
full_action: full_action
|
||||
}
|
||||
end
|
||||
|
||||
def unfold(conn, account_id, post_id, :backward),
|
||||
do: unfold1(conn, account_id, post_id, :max_id)
|
||||
|
||||
def unfold(conn, account_id, post_id, :forward),
|
||||
do: unfold1(conn, account_id, post_id, :since_id)
|
||||
|
||||
defp unfold1(conn, account_id, last_id, dir_key) do
|
||||
# iterate over statuses
|
||||
# accumulator is next_id + collected statuses in a proplist
|
||||
Stream.unfold(
|
||||
{last_id, 0},
|
||||
fn
|
||||
# # early exit
|
||||
# {_, prev_count} when prev_count > 100 ->
|
||||
# nil
|
||||
|
||||
{last_id, prev_count} ->
|
||||
statuses = Hunter.statuses(conn, account_id, [{dir_key, last_id}])
|
||||
|
||||
case statuses do
|
||||
[] ->
|
||||
nil
|
||||
|
||||
_ ->
|
||||
last_id = statuses |> Enum.at(-1) |> Map.get(:id)
|
||||
# IO.puts(:stdio, all |> Enum.map(&Map.get(&1, :id)) |> Enum.join(" "))
|
||||
|
||||
count = Enum.count(statuses) + prev_count
|
||||
|
||||
IO.puts("Fetched #{count} now... up to #{last_id}")
|
||||
|
||||
{statuses, {last_id, count}}
|
||||
end
|
||||
end
|
||||
)
|
||||
end
|
||||
|
||||
def fetch_latest(user: user, password: pass) do
|
||||
{_app, conn, my_id} = initialize(user, pass)
|
||||
|
||||
%Action{external_id: last_id} = ActivityStream.latest_action()
|
||||
|
||||
unfold(conn, my_id, last_id, :forward)
|
||||
|> Enum.to_list()
|
||||
|> List.flatten()
|
||||
|> Enum.map(fn it ->
|
||||
case it
|
||||
|> munge_mastodon()
|
||||
|> Akkoma2org.ActivityStream.create_action() do
|
||||
{:ok, cs} -> cs
|
||||
{:error, cs} -> IO.inspect(cs)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
def backfill(user: user, password: pass) do
|
||||
{_app, conn, my_id} = initialize(user, pass)
|
||||
|
||||
statuses = Hunter.statuses(conn, my_id)
|
||||
last_id = statuses |> Enum.at(-1) |> Map.get(:id)
|
||||
|
||||
posts =
|
||||
unfold(conn, my_id, last_id, :backward)
|
||||
|> Enum.to_list()
|
||||
|
||||
all_posts = (statuses ++ posts) |> List.flatten()
|
||||
|
||||
all_posts
|
||||
|> Enum.map(fn it ->
|
||||
case it
|
||||
|> munge_mastodon()
|
||||
|> Akkoma2org.ActivityStream.create_action() do
|
||||
{:ok, cs} -> cs
|
||||
{:error, cs} -> IO.inspect(cs)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp initialize(user, pass) do
|
||||
app =
|
||||
try do
|
||||
Hunter.Application.load_credentials("akkoma2org")
|
||||
rescue
|
||||
FileError ->
|
||||
Hunter.create_app("akkoma2org", "urn:ietf:wg:oauth:2.0:oob", ["read"], nil, save?: true)
|
||||
end
|
||||
|
||||
conn = Hunter.log_in(app, user, pass, "https://notes.whatthefuck.computer")
|
||||
|
||||
my_id = Hunter.verify_credentials(conn) |> Map.get(:id)
|
||||
|
||||
{app, conn, my_id}
|
||||
end
|
||||
end
|
|
@ -0,0 +1,53 @@
|
|||
defmodule Akkoma2org.ActivityStream.FileBackfill do
|
||||
def post_source(from) when is_binary(from), do: from
|
||||
def post_source(from) when is_map(from), do: Map.get(from, "content")
|
||||
|
||||
@doc "Convert a map with a serialized AS2 Activity in it to our internal format"
|
||||
def munge_as2(as2_map) do
|
||||
obj = Map.get(as2_map, "object")
|
||||
type = as2_map |> Map.get("type")
|
||||
|
||||
case type do
|
||||
"Create" ->
|
||||
post_src = obj |> Map.get("source") |> post_source
|
||||
|
||||
%{
|
||||
external_id: obj |> Map.get("id"),
|
||||
action_type: type,
|
||||
actor: as2_map |> Map.get("actor"),
|
||||
dest: obj |> Map.get("to") |> Enum.at(0),
|
||||
published: obj |> Map.get("published"),
|
||||
type: obj |> Map.get("type"),
|
||||
summary: obj |> Map.get("summary"),
|
||||
content: obj |> Map.get("content"),
|
||||
post_source: post_src,
|
||||
full_action: Jason.encode!(as2_map)
|
||||
}
|
||||
|
||||
"Announce" ->
|
||||
IO.warn("Announcements not supported")
|
||||
%{}
|
||||
end
|
||||
end
|
||||
|
||||
def ingest_one_as2(item) do
|
||||
case item
|
||||
|> Akkoma2org.ActivityStream.munge_as2()
|
||||
|> Akkoma2org.ActivityStream.create_action() do
|
||||
{:ok, _result} -> IO.puts("inserted")
|
||||
{:error, _} -> IO.warn("failed")
|
||||
end
|
||||
end
|
||||
|
||||
def ingest_file(path) do
|
||||
with {:ok, handle} <- :zip.zip_open(to_charlist(path), [:memory]),
|
||||
{:ok, {'outbox.json', res}} = :zip.zip_get("outbox.json", handle) do
|
||||
res
|
||||
|> Jason.decode!()
|
||||
|> Map.get("orderedItems")
|
||||
|> Enum.map(&ingest_one_as2(&1))
|
||||
|
||||
:zip.zip_close(handle)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,5 +1,5 @@
|
|||
defmodule Mix.Tasks.Akkoma2org.ApiIngest do
|
||||
@moduledoc "The ingest mix task: `mix help ingex`"
|
||||
@moduledoc "The ingest mix task: `mix help api_ingest`"
|
||||
use Mix.Task
|
||||
|
||||
@shortdoc "Ingest from an API"
|
||||
|
@ -29,6 +29,6 @@ defmodule Mix.Tasks.Akkoma2org.ApiIngest do
|
|||
pass
|
||||
end
|
||||
|
||||
Akkoma2org.ActivityStream.ingest_api(user: user, password: pass)
|
||||
Akkoma2org.ActivityStream.APIBackfill.backfill(user: user, password: pass)
|
||||
end
|
||||
end
|
||||
|
|
2
mix.exs
2
mix.exs
|
@ -67,7 +67,7 @@ defmodule Akkoma2org.MixProject do
|
|||
"ecto.reset": [
|
||||
"ecto.drop",
|
||||
"ecto.setup",
|
||||
"ingest '/home/rrix/Downloads/archive-rrix-20230914T172758-Z1QD3_2KpjRtmGAKaKfKyV65HIqBMXlojmInh1QbgOY.zip'"
|
||||
"akkoma2org.api_ingest"
|
||||
],
|
||||
test: ["ecto.create --quiet", "ecto.migrate --quiet", "test"],
|
||||
"assets.setup": ["tailwind.install --if-missing", "esbuild.install --if-missing"],
|
||||
|
|
Loading…
Reference in New Issue