Wenn sie wieder weg sind: Un-Following tracken

Ja, es ist das Drama schlechthin im Social Media. Will man es wissen wer einem entfolgt? Wenn ja, geht das auch mit Python.

Einleitung

Im letzten Artikel behandelte ich das Synchronisieren von Accounts, denen ich folge und die mir folgen. Allerdings gibt es mit der Implementation das Problem, dass die Accounts nur hinzugefügt oder aktualisiert werden.

Den Fall, dass wir jemanden nicht mehr folgen oder wir selbst entfolgt werden, wird mit dem letzten Script nicht erkannt.

Aus psychologischer Sicht kann man sich die Frage stellen: Will ich das überhaupt wissen? Viele Social-Media-Plattformen halten sich bei dieser Frage auch bedeckt. Man wird informiert, dass man jemanden hat, der sich für die eigenen Beiträge interessiert, aber das Entfolgen passiert still. Auch Mastodon, was als Medium versucht möglichst positive Vibes zu vermitteln, verfährt so.

In der Synchronisation der Accounts spiegelt sich das auch wider, aber auf einer reinen technischen Ebene. Wie erkennt man aus einer REST Schnittstelle, die darauf basiert nur existierende Daten zu lieferen, welche eben nicht mehr da sind?

Mastodon hat eine Notification-API, um über Veränderung informiert zu werden. Aber die ist UI-getrieben (und dort will man Un-Follows nicht kommunizieren). Das fällt also flach. Man könnte schauen, ob sich die Anzahl verändert hat (die bekommt man aus den Account-Daten als Statistik mit). Aber das ist ungenau, denn es könnte ein Follow und Un-Follow geben und damit hat sich die absolute Anzahl nicht geändert. Bleibt also nur Brute-Force. Es müssen alle Accounts verglichen werden. D.h. man schaut, welche Account-IDs von der Mastodon-Instanz geliefert werden und vergleicht sie mit den IDs in OpenSearch. Da gibt es dann eine Differenz, die man dann als Un-Follow interpretieren kann (wenn in OpenSearch die ID existiert, aber Mastodon diese nicht mehr listet).

Diese Full-Diff Vergleiche bieten eine sehr stabile Aussage über Bewegungen zwischen zwei Zeitpunkten der Abfragen, allerdings nicht, wann es genau passiert ist. Damit können wir leben. Man muss nur im Fehlerfall aufpassen, dass eine leere Antwort der Mastodon-Instanz nicht als kompletter Un-Follow aller Accounts interpretiert wird.

Deswegen lösche ich auch ungern bei solchen Abgleichen Daten, die vermeintlich auf einer Remote-Instanz nicht mehr existieren. Das kann dann auf einmal zu kompletten lokalen Datenverlust führen.

Mehr Indexe

Da ich die followers und following als aktuellen Zustand der letzten Abfrage beibehalten möchte, verschiebe ich einfach die Accounts, die meine Mastodon-Instanz nicht mehr als Verbindung listet. Dafür brauche ich zwei weitere Indexe: un-followers und un-following. Das Tolle ist, dass das Script zum Anlegen der Account-Indexe schon so generisch ist. Da muss ich nur die zwei Namen der neuen Indexe hinzufügen:

...

# Namen der Indexe
index_names = ['followers', 'following', 'un-followers', 'un-following']
...

Und ja, ich wusste, dass ich das mal machen werde, deswegen hat die Schleife schon den Test eingebaut:

...
for index_name in index_names:
    if not client.indices.exists(index_name):
        ...

Wenn man das Script osCreateFollowersIndexes.py, mit den zwei zusätzlichen Index-Namen wieder aufruft, werden nur die beiden (noch nicht vorhandenen) Indexe angelegt.

Nun haben wir vier Indexe, die Accounts aufnehmen können.

Differenzen

Nein, persönliche Differenzen gab es bei den Un-Follows hoffentlich nicht, aber wir müssen die irgendwie ermittlen.

Erinnern wir uns an die Methode, die unsere Accounts lädt:

def load_to_index(client, index_name, initial_load, next_load):
    print(f'Import {index_name}')
    page = None
    created = 0
    updated = 0
    while True:
        page = initial_load(id=me['id'], limit=80) if page is None else next_load(page)
        if page:
            for account in page:
                response = client.index(index_name, id=account['id'], body=account)
                if response['result'] == "created":
                    print(f"   New in {index_name}: {account['acct']}")
                    created += 1
                elif response['result'] == "updated":
                    updated += 1
        else:
            break

    client.indices.refresh(index=index_name)
    print(f'Finished "{index_name}". Created {created} accounts and updated {updated}')

Dort müssen wir uns jetzt alle IDs merken, die wir von Mastodon erhalten. Ich mache es mir einfach und schreibe die alle in eine Liste (Set wäre besser und schneller). Da ich wohl nie Millionen von Followern haben werde, sehe ich keine Speicherprobleme alle IDs auf diese Weise zu merken.

Die neue Variante:

def load_to_index(client, index_name, initial_load, next_load):
    print(f'Import {index_name}')
    page = None
    created = 0
    updated = 0
    extracted = []
    while True:
        page = initial_load(id=me['id'], limit=80) if page is None else next_load(page)
        if page:
            for account in page:
                id = account['id']
                extracted.append (id)
                response = client.index(index_name, id=id, body=account)
                if response['result'] == "created":
                    print(f"   New in {index_name}: {account['acct']}")
                    created += 1
                elif response['result'] == "updated":
                    updated += 1
        else:
            break

    client.indices.refresh(index=index_name)
    print(f'Finished "{index_name}". Created {created} accounts and updated {updated}')

Ich bin kein Fan davon, Funktionen unnötig aufzublähen. Deswegen gibt es einen Aufruf zu einer neuen Methode, die sich dann um diese Liste der IDs auf den spezifischen Index (followers oder following) kümmert.

Also noch diese Zeilen anfügen:

    # Now we have in extracted the current active existing accounts assigned to the Mastodon-Account
    # In the OS index we have maybe more. We must move the account to un-{index_name}
    move_unlisted(client, index_name, extracted)

Verschiebebahnhof

move_unlisted gibt es noch nicht, also gleich mal anlegen. Da kommt auch sofort die Suche nach den Dokumenten (Accounts) rein. Das hatten wir so ähnlich schon mal in älteren Artikeln ausprobiert, allerdings machen wir es jetzt mit der Scroll API wie in dem neuen Blog-Artikel für Scroll und PIT Abfragen beschrieben:

def move_unlisted(client, index_name, extracted):
    docs_listed=0
    docs_moved=0
    search_all={
        "_source": False,
        "size": 100,
        "query": {
            "match_all": {}
        }
    }
    resp=client.search(index=index_name, body=search_all, scroll='5s')
    scroll_id = resp['_scroll_id']
    while len(resp['hits']['hits']):
        for doc in resp['hits']['hits']:
            docs_listed+=1
            if int(doc['_id']) not in extracted:
                # the account doc in OS is not any more listed in the Mastodon instance
                move_document(client, id=doc['_id'], index_from=index_name, index_to="un-" + index_name)
                docs_moved+=1

        # make a request using the Scroll API
        resp = client.scroll(
            scroll_id = scroll_id,
            scroll = '5s' # length of time to keep search context
        )
        scroll_id = resp['_scroll_id']

    client.clear_scroll(scroll_id=scroll_id)

    print(f'In "{index_name}" {docs_listed} found, '
          f'{docs_moved} to un-{index_name}. '
          f'{docs_listed-docs_moved} remaining')

Ok, das ist schon harter Tobak. Vor der While-Schleife ist die Abfrage, bei der wir zugleich OpenSearch Anweisen uns eine Scroll-ID zu geben. So weit, so klar. Die While-Bedingung ist eigentlich ein: Mach so lange, wie wir Treffer bekommen. Die For-Schleife iteriert durch unsere Dokumente, die wir angefragt haben.

Die einzige spannende Logik ist wirklich die Prüfung, ob die doc['id'] sich nicht in der Liste der extracted befindet. Wenn das so ist, dann haben wir den Fall, dass das Dokument (also der Account) im OpenSearch Index abgelegt wurde, aber in Mastodon keine Verbindung mehr zu meinem Account hat (die id ist nicht in extracted vorhanden). Damit hätten wir einen Treffer eines “Un-Follower” oder “Un-Following”.

Da mit dieses verschachtelte Monstrum an Methode nicht noch komplexer wird, erfolgt dann eben wieder ein Unteraufruf der nächsten Funktion (die es noch nicht gibt): move_document (...).

Der Rest der obigen Methode ist nur noch das Behandeln der Scroll-IDs und ein wenig Statistik und Infos an uns Entwickler*innen.

Ein Dokument verschieben

Also rein in die move_document, die wirklich nicht mehr viel macht:

def move_document(client: OpenSearch, id, index_from, index_to):
    doc = client.get(index_from, id)
    source = doc['_source']
    client.index(index_to, id=id, body=source)
    client.delete(index_from, id)
    client.indices.refresh(index=index_from)
    client.indices.refresh(index=index_to)
    print(f"   Moved {source['acct']} to {index_to} ")

Sie ist sehr generisch geschrieben und kann daher mit beliebigen Dokumenten auf beliebigen Indexen ausgeführt werden. Der Aufrufer muss nur die passenden Parameter setzen. Das refresh ist etwas ineffizient, wenn wir das aus einer Schleife aufrufen, aber wir hoffen sowieso nur auf wenige Verschiebungen.

Das move_document ist eine Kombination aus: Nimm das Dokument, indiziere es im Zielindex, lösche es im Quellindex, aktualisiere beide Indexe und schreib was Nettes in die Konsole. Fehlerbehandlung ist mal wieder ausgeklammert.

Im Prinzip war das keine wirklich aufwendige Erweiterung. Etwas Komplexität kam durch den Einsatz der Scroll API. Aber es ist sinnvoll, sich Ergebnisse aus einem Datenpool grundsätzlich nur in Häppchen zu geben und ohne Scroll API wäre bei über 10.000 Dokumenten Schluss gewesen. Ob wir wirklich mal auf als 10.000 Follower haben werden… na ja, aber wir wissen nun wie es geht.


Noch ein paar Anmerkungen, bevor ich das Script mal komplett am Ende aufliste. Das Ausgeben der Informationen auf die Konsole ist eine tolle Sache, aber lebt nur diesen kurzen Augenblick und ist meistens schnell verpufft. Suchen wir explizit nach Fehlern, hilft uns das als Entwickler*innen. Tauchen aber Fehler erst im produktiven Betrieb auf, haben wir nichts in der Hand die Probleme nachzuvollziehen.

Dafür gibt es überlicherweise Logging-Frameworks. Logging bedeutet, wir übergeben eine Meldung an ein Framework, dass daraus eine strukturierte Information macht und diese üblicherweise in Textdateien ablegt (für Menschen lesbar). Manche Logging-Frameworks bieten Connectoren, diese Log-Daten in Datenbanken zu schrieben, damit die durchsuchbar werden. Meistens handelt es sich dann wieder um ElasticSearch oder OpenSearch.

Das könnten wir einfach verkürzen. Wir könnten einen Index für eine Arbneits-Historie anlegen und einen Index für Ausnahmen und Fehler. Ich baue das jetzt nicht ein, sondern werde dafür einen extra Artikel machen. Das ist nämlich eine ungeheuer sinnvolle Sache und man sollte das mal gemacht haben, um zu sehen, wie einfach das geht und welche Vorteile das bringt.


Das Script loadFollowersFollowingtoOS.py

Nun aber das komplette Script, das auch Un-Follows und Un-Following versteht:

import json
from typing import Any

from mastodon import Mastodon
from opensearchpy import OpenSearch


def login_os(host, port):
    # Client zu dem Dev Cluster (ohne SSL, ohne Anmeldung)
    return OpenSearch(
        hosts=[{'host': host, 'port': port}],
        http_compress=True,  # enables gzip compression for request bodies
        use_ssl=False
    )


def login_mastodon(url, user, password):
    mastodon = Mastodon(
        client_id='ep*****************************************Xo', # Eure ID
        client_secret='AY******************************************0s', # das Geheimnis zur ID
        api_base_url=url
    )

    mastodon.access_token = mastodon.log_in(
        username=user,
        password=password,
        scopes=['read', 'write']
    )

    return mastodon


def move_document(client: OpenSearch, id, index_from, index_to):
    doc = client.get(index_from, id)
    source = doc['_source']
    client.index(index_to, id=id, body=source)
    client.delete(index_from, id)
    client.indices.refresh(index=index_from)
    client.indices.refresh(index=index_to)
    print(f"   Moved {source['acct']} to {index_to} ")


def move_unlisted(client, index_name, extracted):
    docs_listed=0
    docs_moved=0
    search_all={
        "_source": False,
        "size": 100,
        "query": {
            "match_all": {}
        }
    }
    resp=client.search(index=index_name, body=search_all, scroll='5s')
    scroll_id = resp['_scroll_id']
    while len(resp['hits']['hits']):
        for doc in resp['hits']['hits']:
            docs_listed+=1
            if int(doc['_id']) not in extracted:
                # the account doc in OS is not any more listed in the Mastodon instance
                move_document(client, id=doc['_id'], index_from=index_name, index_to="un-" + index_name)
                docs_moved+=1

        # make a request using the Scroll API
        resp = client.scroll(
            scroll_id = scroll_id,
            scroll = '5s' # length of time to keep search context
        )
        scroll_id = resp['_scroll_id']

    client.clear_scroll(scroll_id=scroll_id)

    print(f'In "{index_name}" {docs_listed} found, '
          f'{docs_moved} to un-{index_name}. '
          f'{docs_listed-docs_moved} remaining')


def load_to_index(client, index_name, initial_load, next_load):
    print(f'Import {index_name}')
    page = None
    created = 0
    updated = 0
    extracted = []
    while True:
        page = initial_load(id=me['id'], limit=80) if page is None else next_load(page)
        if page:
            for account in page:
                id = account['id']
                extracted.append (id)
                response = client.index(index_name, id=id, body=account)
                if response['result'] == "created":
                    print(f"   New in {index_name}: {account['acct']}")
                    created += 1
                elif response['result'] == "updated":
                    updated += 1
        else:
            break

    client.indices.refresh(index=index_name)
    print(f'Finished "{index_name}". Created {created} accounts and updated {updated}')

    # Now we have in extracted the current active existing accounts assigned to the Mastodon-Account
    # In the OS index we have maybe more. We must move the account to un-{index_name}
    move_unlisted(client, index_name, extracted)


client = login_os('localhost', 9200)
mastodon = login_mastodon('https://social.tchncs.de', '<email-adresse>, '<kennwort des accounts>')

# It's me:
me = mastodon.me()
print (f"Me: {me['id']} = {me['username']}")

# Followers von Mastodon nach OpenSearch kopieren:
load_to_index(client, 'followers', mastodon.account_followers, mastodon.fetch_next)
load_to_index(client, 'following', mastodon.account_following, mastodon.fetch_next)

Eine Ausgabe, wie es mal bei mir ausgesehen hat (die Sternchen sind natürlich reale Daten aus Mastodon):

Me: ****** = beandev
Import followers
   New in followers: **********@*****.****
   New in followers: ******@*****.**
   New in followers: *************@*****.*****
   New in followers: *******@*****.**
   New in followers: *********@*****.**
Finished "followers". Created 5 accounts and updated 196
   Moved ********** to un-followed
In "followers" 201 found, 1 to un-followers. 200 remaining
Import following
   New in following: *************@*****.*****
   New in following: ************@*****.*******
   New in following: ********@*****.*****
Finished "following". Created 3 accounts and updated 139
In "following" 142 found, 0 to un-following. 142 remaining

Ja, also ein Account hat mich wohl verlassen… Aber jeder Abschied hat auch was Gutes. Hier war es möglich zu testen, ob die neue Logik funktioniert.

Wer war das wohl?

http://localhost:9200/un-followers/_search?pretty=true

Egal…

Viel Spaß mit dem Script! Interessiert sich Eure Bubble auch für #Python, #Mastodon und #OpenSearch? Dann boosted den Artikel!