Greboca  

Suport technique et veille technologique

DLFP - Dépêches  -  Pyruse 1.0 : pour remplacer Fail2ban et autres « scruteurs » de logs sur un Linux moderne

 -  12 février - 

Après plus de 10 ans de présence discrète sur LinuxFR, tout juste marquée de quelques commentaires, je me décide enfin à proposer une dépêche.
Je souhaite vous présenter la version 1.0 de Pyruse, que j’ai développé sous licence GPLv3 pour mon propre usage en auto-hébergement, car je sais que se promènent ici d’autres auto-hébergeurs qui pourront être intéressés.

Sommaire

Origine du projet

Il faut remonter à l’année dernière, avec mon ancien serveur qui arrivait au bout de ses possibilités (utilisation constante du swap, lenteurs…). J’ai alors décidé de le remplacer, et d’en profiter pour l’améliorer au niveau du logiciel.

Comme toute occasion est bonne pour apprendre, j’ai décidé de me concentrer sur l’automatisation (avec Ansible) et la sécurisation, le tout avec Archlinux (j’en reparlerai plus tard…).
Quand est venu le moment d’implémenter la supervision, je n’ai pas voulu recommencer à utiliser Epylog, ni Fail2ban:

  • Ces deux outils parcourent les logs chacun de son côté. C’est du gaspillage de ressources.
  • Aucun des deux outils n’est assez souple pour me permettre la finesse de configuration que je souhaitais obtenir.

De là est venue la décision de créer mon propre logiciel, avec Python (un apprentissage de plus…)

Que fait Pyruse ?

Pyruse est en quelque sorte abonné au journal système de systemd et se contente de pousser chaque entrée lue du journal dans un workflow constitué de filtres et d’actions. J’ai pris mon inspiration non seulement dans les deux logiciels sus-mentionnés, mais aussi dans Netfilter : la configuration décrit des chaînes d’exécution liées les unes aux autres.

Le cœur de Pyruse ne fait rien. Tout repose sur les filtres et les actions. De base, les filtres proposés sont :

  • des comparaisons : =, ≤, ≥, ∈ ;
  • les expressions rationnelles compatibles Perl (pcre), avec sauvegarde des valeurs capturées ((…), (?P…)) ;
  • le test d’existence d’un utilisateur.

Au niveau des actions, pour l’instant, j’ai juste implémenté ce dont j’ai besoin, à savoir :

  • l’augmentation et la réinitialisation de compteurs (nombre d’accès SSH en échec par exemple) ;
  • l’envoi instantané d’emails, essentiellement pour des alertes ;
  • un compte-rendu quotidien des événements des dernières 24h ;
  • sans oublier « noop », l’action qui ne fait rien… (si, si, ça sert !)

Ces filtres et actions ont accès à une « entry », une entrée de journal systemd, qui est dans les faits un dictionnaire Python (une Map, pour les adeptes de Java). Chaque filtre ou action peut agir sur cette entrée de journal, et aussi la compléter (extraction d’un nom d’utilisateur, d’une adresse IP, valeur courante d’un compteur…).

Il est trivial d’ajouter un module. À titre d’exemple, voici le code du filtre de test d’égalité :

from pyruse import base

class Filter(base.Filter):
    def __init__(self, args):
        super().__init__()
        self.field = args["field"]
        self.value = args["value"]

    def filter(self, entry):
        return entry[self.field] == self.value if self.field in entry else False

et voici celui de l’action d’envoi d’un email:

import string
from pyruse import base, email

class Action(base.Action):
    def __init__(self, args):
        super().__init__()
        self.subject = args.get("subject", "Pyruse Notification")
        self.template = args["message"]
        values = {}
        for (_void, name, _void, _void) in string.Formatter().parse(self.template):
            if name:
                values[name] = None
        self.values = values

    def act(self, entry):
        for (name, _void) in self.values.items():
            self.values[name] = entry.get(name, None)
        msg = self.template.format_map(self.values)
        email.Mail(msg).setSubject(self.subject).send()

Comme Fail2ban : bannir une IP qui abuse du port SSH

Voici un extrait de configuration qui réalise cela :

"Détecter les accès SSH en échec": [
  { "filter": "filter_equals",
    "args": { "field": "_SYSTEMD_UNIT", "value": "sshd.service" }
  },
  { "filter": "filter_pcreAny",
    "args": { "field": "MESSAGE", "re": [
      "^Failed password for (?P.*) from (?P[^ ]*) port",
      "^Invalid user (?P.*) from (?P[^ ]*) port",
      "^User (?P.*) from (?P[^ ]*) not allowed because not listed in AllowUsers$"
    ] }
  },
  { "action": "action_counterRaise",
    "args": { "counter": "sshd", "for": "adresseIP", "keepSeconds": 300, "save": "nbÉchecs" }
  },
  { "filter": "filter_greaterOrEquals",
    "args": { "field": "nbÉchecs", "value": 4 },
    "else": "… Ça ira pour cette fois, circulez !"
  },
  { "action": "action_nftBan",
    "args": {
      "IP": "adresseIP", "banSeconds": 86400,
      "nftSetIPv4": "Inet4 sshd_ban", "nftSetIPv6": "Inet6 sshd_ban"
    }
  }
],
"… Ça ira pour cette fois, circulez !": [
  { "action": "action_noop" }
]
  1. D’abord, on regarde si l’entrée de log vient bien de SSH, sinon on passe à la chaîne d’exécution suivante (non écrite dans cet exemple).
  2. Ensuite, on essaie de repérer une authentification en échec : si c’est bien ça, on passe à l’étape suivante, sinon on passe à la chaîne d’exécution suivante (non écrite dans l’exemple).
  3. Puisque l’authentification est en échec, on augmente le compteur pour cette IP, chaque détection étant mémorisée 5 minutes ; la valeur du compteur est enregistrée dans nbÉchecs.
  4. On regarde ce compteur : à partir de 4, on passe à l’étape suivante ; en dessous, on « appelle » la sous-chaîne d’exécution « … Ça ira pour cette fois, circulez ! » (qui ne fait rien).
  5. Puisque l’IP à l’origine de la ligne de log a abusé du service, elle sera bannie pour 24h.

Chaque chaîne d’exécution est introduite par une étiquette libre, et se déroule jusqu’à ce qu’un filtre soit non-passant, auquel cas on passe à la chaîne d’exécution suivante (sauf si un « else » est explicitement fourni).
Quand on arrive à la fin de la chaîne d’exécution : si c’est une action, ça s’arrête là ; si c’est un filtre, on continue avec la chaîne d’exécution suivante (les « sous-chaînes » d’exécution, appelées depuis les « then » et « else », sont ignorées).

Comme Epylog : obtenir un rapport quotidien

Je complète un peu mon exemple précédent :

"Détecter les accès SSH en échec": [
  { "filter": "filter_equals", 
  },
  { "filter": "filter_pcreAny",
    "args": { "field": "MESSAGE", "re": [  ] },
    "else": "… Détecter les accès SSH réussis"
  },
  { "action": "action_counterRaise", 
  },
  { "filter": "filter_greaterOrEquals", 
  },
  { "action": "action_nftBan", 
  },
  { "action": "action_dailyReport",
    "args": { "level": "INFO", "message": "Bannissement de {adresseIP} en accès SSH" }
  }
],
"… Ça ira pour cette fois, circulez !": [
  { "action": "action_noop" }
],
"… Détecter les accès SSH réussis": [
  { "filter": "filter_pcre",
    "args": {
      "field": "MESSAGE",
      "re": "^Accepted (?:password|publickey) for (.*) from ([^ ]*) port ", "save": [ "utilisateur", "adresseIP" ]
    }
  },
  { "action": "action_counterReset",
    "args": { "counter": "sshd", "for": "adresseIP", "graceSeconds": 432000 }
  },
  { "action": "action_dailyReport",
    "args": { "level": "INFO", "message": "Login en tant que {utilisateur}@{_HOSTNAME} par SSH" }
  }
],
"Entrée de log inconnue": [
  { "action": "action_dailyReport",
    "args": {
      "level": "OTHER",
      "message": "[{PRIORITY}/{SYSLOG_IDENTIFIER}] {_UID}:{_GID}@{_HOSTNAME}:{_CMDLINE} ({_SYSTEMD_UNIT})\n    {MESSAGE}"
    }
  }
]

Cette fois-ci, si une entrée de log n’émane pas de SSH, elle passe directement à « Entrée de log inconnue », où elle est ajoutée au (futur) rapport quotidien (section « Autre »).
Du côté de SSH :

  • Ça commence pareil, mais si le message ne correspond pas à un échec d’authentification, on tente de détecter une authentification réussie. De plus, après un bannissement, on ajoute une information au (futur) rapport quotidien.
  • Une nouvelle sous-chaîne d’exécution traite les accès SSH réussis : dans ce cas, on met l’IP concernée en liste blanche pendant 5 jours et on ajoute un message au (futur) rapport quotidien.

Passé minuit, je recevrai un email qui pourra ressembler à ça (en texte, mais il y a aussi du HTML) :

= Pyruse Report

== WARNING Messages

|===============================================================================
|Count|Message                                    |Date+time for each occurrence
|===============================================================================

== Information Messages

|===============================================================================
|Count|Message                                    |Date+time for each occurrence

|  1  |Banissement de {115.193.108.70} en accès SSH
      |2018-02-09 04:28:28.247342

|  2  |Login en tant que yves@dmz par SSH
      |2018-02-09 08:45:35.049244 +
       2018-02-09 09:36:14.220262

|===============================================================================

== Other log events

----------
2018-02-09 07:55:25.159374: [7/ldapwhoami] 994:17@dmz:None (None)
    DIGEST-MD5 common mech free
2018-02-09 08:05:00.738969: [3/php] 994:994@backend:/usr/bin/php -f /srv/nextcloud/cron.php (nc-cron.service)
    {PHP} PHP Startup: Unable to load dynamic library '/usr/lib/php/modules/imagick.so'
----------

Et davantage !

Bien sûr, le but n’était pas de copier l’existant. Voici ce qu’apporte Pyruse.

Du contexte

Pour commencer, les logiciels de ce type sont généralement conçus pour appliquer un ensemble de règles à chaque ligne de log, règle après règle, sans tenir compte de ce qui a déjà été vu.

Par contraste, notez dans l’exemple ci-dessus qu’après avoir constaté qu’une entrée de log issue de SSH n’est pas un échec d’authentification, on ne vérifie pas à nouveau que l’entrée de log est bien issue de SSH : cela a déjà été vérifié ! On regarde donc directement si le message correspond à une authentification réussie.

Optimisation de l’exécution grâce au contexte

Le fichier de configuration est prévu pour traiter les entrées de log de l’ensemble des services fonctionnant sur le serveur.
Une fois déterminé qu’une entrée de log n’est pas issue de “sshd.service” (ce qui n’est au passage qu’un test d’égalité, bien plus efficace que d’appliquer une expression rationnelle), on passe directement à la chaîne d’exécution suivante, évitant ainsi l’ensemble des expressions rationnelles relatives à SSH.

Optimisation de l’exécution grâce à systemd

Comme expliqué ci-dessus, un simple test sur la valeur de « _SYSTEMD_UNIT » dans l’entrée de log permet d’éviter d’un coup plusieurs tests non pertinents.
De plus, une simple comparaison numérique avec « PRIORITY » peut suffire à écarter d’énormes quantités de logs :

"Détecter les erreurs HTTP vers Nextcloud": [
  { "filter": "filter_equals",
    "args": { "field": "_SYSTEMD_UNIT", "value": "uwsgi@nextcloud.service" }
  },
  { "filter": "filter_pcre",
    "args": {
      "field": "MESSAGE",
      "re": "^\[[^]]+\] ([^ ]+) .*\] ([A-Z]+ /[^?]*)(?:\?.*)? => .*\(HTTP/1.1 5..\)",
      "save": [ "adresseIP", "requêteHTTP" ] },
    "else": "… Ignorer les erreurs de codage de Nextcloud"
  },
  { "action": "action_dailyReport",
    "args": {
      "level": "INFO",
      "message": "Nextcloud : échec de « {requêteHTTP} » par {adresseIP}"
    }
  }
],
"… Ignorer les erreurs de codage de Nextcloud": [
  {
    "filter": "filter_in",
    "args": { "field": "PRIORITY", "values": [ 2, 3 ] },
    "else": "… Ignorer les erreurs de binding Nextcloud–LDAP"
  },
  { "action": "action_noop" }
],
etc.

Ici, constatant que Nextcloud génère en priorité 2 et 3 des erreurs qui se rapportent au codage, j’ignore toutes les entrées de logs de ces niveaux de priorité, après avoir récupéré les quelques messages de ces niveaux qui m’intéressent.

Souplesse de configuration

Il est possible de combiner les possibilités pour effectuer des actions proportionnées ; par exemple :

  • si l’événement constaté nécessite un intervention au plus vite, envoi d’un email immédiat ;
  • si tel n’est pas le cas mais qu’une étude de la situation est nécessaire, ajout d’un avertissement dans le rapport quotidien ;
  • si l’événement n’est pas si important mais mérite quand même d’être mentionné, ajout d’une information dans le rapport quotidien.

Il est aussi possible d’élaborer des workflows complexes :

"Détecter les accès SSH en échec": [
  { "filter": "filter_equals", 
  },
  { "filter": "filter_pcreAny",
    "args": { "field": "MESSAGE", "re": [  ] },
    "else": "… Détecter les accès SSH réussis"
  },
  { "filter": "filter_userExists",
    "args": { "field": "utilisateur" },
    "else": "… Utilisateur SSH inexistant"
  },
  { "action": "action_email",
    "args": { "message": "ATTENTION: Tentative d’intrusion en tant que {utilisateur} !" }
  },
  { "action": "action_dailyReport",
    "args": { "level": "WARN", "message": "Authentification SSH en échec pour {utilisateur}" },
    "then": "… Détecter les échecs SSH répétés"
  }
],
"… Utilisateur SSH inexistant": [
  { "action": "action_dailyReport",
    "args": { "level": "INFO", "message": "Authentification SSH en échec pour {utilisateur}" },
    "then": "… Détecter les échecs SSH répétés"
  }
],
"… Détecter les accès SSH réussis": [
  { "filter": "filter_pcre", 
  },
  { "action": "action_counterReset", 
  },
  { "action": "action_dailyReport", 
  }
],
"… Détecter les échecs SSH répétés": [
  { "action": "action_counterRaise", 
  },
  { "filter": "filter_greaterOrEquals", 
  },
  { "action": "action_nftBan", 
  },
  { "action": "action_dailyReport", 
  }
],
"… Ça ira pour cette fois, circulez !": [
  { "action": "action_noop" }
],
etc.

Ici, la section complétée dans le rapport quotidien (WARN ou INFO) dépend de l’existence ou non de l’utilisateur sur le système. De plus, si l’utilisateur existe, un email d’alerte est immédiatement envoyé.

Le futur

Pour ma part, les fonctionnalités nécessaires sont présentes. Toutefois, si d’autres personnes se montrent intéressées par ce logiciel et qu’il leur manque des modules, il est facile d’en ajouter ; j’accepterai avec plaisir les contributions ;-)

Il y a tout de même une chose que je voudrais faire évoluer à court terme : le rapport quotidien. Je prévois d’utiliser un système de « template ». En effet, je conçois que tout le monde n’adhère pas forcément à mon souhait d’afficher pour chaque message la date et l’heure de chaque occurrence…

Ah, une dernière chose : c’est aussi la première fois que je communique publiquement autour d’un projet hébergé par Gitea sur mon serveur, et je n’ai pour l’instant qu’une vision interne. Si quelqu’un rencontre un quelconque problème d’interaction avec le serveur, il ne faut pas hésiter à me le signaler ! Merci :-)

Y.

Lire les commentaires

par Yves, ZeroHeure, palm123

DLFP - Dépêches

LinuxFr.org

Sortie de GCC 8.1

 -  19 mai - 

La sortie de la nouvelle version majeure du compilateur GCC du projet GNU a été annoncée le 2 mai 2018. Écrit à l’origine par Richard Stallman, le (...)


Wiseflat, un serveur conteneurisé pour vos projets personnels

 -  17 mai - 

Wiseflat est un projet permettant d’héberger ses applications Web dans des conteneurs LXC sur un (ou plusieurs) Raspberry Pi. C’est une alternative (...)


Firefox 60 et 60 ESR

 -  15 mai - 

Firefox 60 est sorti le 9 mai 2018. La version bureau comporte également, cette fois‐ci, la déclinaison ESR (laquelle succédera à la version 52 ESR (...)


ToutEnClic 5.02 pour gommer les différences

 -  7 mai - 

ToutEnClic est un logiciel pour travailler à l’écran sur un document numérisé, de la même façon que sur un cahier, pour palier le handicap moteur des (...)


Sortie de « La bataille pour Wesnoth » 1.14

 -  7 mai - 

The Battle for Wesnoth est un jeu de stratégie au tour par tour, une pièce incontournable dans le monde du jeu libre. Après un peu plus de trois ans (...)