От стандартен мониторинг на auth.log и fail2ban до облачен AI анализ, който разпознава модели, открива аномалии и генерира реални препоръки. Така превърнах fail…
⚠ Този материал описва реален сценарий под Debian, при който стандартен мониторинг на auth.log, mail.log и fail2ban
се надгражда с облачен AI анализ. Конфигурациите са примерни. Не копирайте 1 към 1 в продукция, без да ги адаптирате към вашата среда.
Идеята е проста. Debian сървърът продължава да си върши работата както досега, но към класическия monitoring добавяме облачен AI анализ, който вижда модели, аномалии и тенденции, които с просто око често се изпускат.
От там нататък логовете не са просто текстов файл, а източник на Security Intelligence, който помага за решения в реално време, а не само за forensic анализ след инцидент.
Класическият сценарий. Имате Debian сървър, auth.log, mail.log, fail2ban и може би още няколко услуги.
Всичко е наред, докато натоварването и опитите за атаки не станат толкова много, че или пропускате нещо, или гледате логове по цял ден.
Исках нещо по различно:
Архитектурата е разделена на няколко нива, за да не се превърне в чудовище, което никой не поддържа.
/var/log/auth.log, /var/log/mail.log, /var/log/syslog.auth.log и fail2banDebian → Python скрипт → MySQL / логове → AI API → оценка на риск → email / web панел / действия
Скриптът под Debian прави няколко неща едновременно. Чете логовете, вади ключовите полета, говори с fail2ban, пише в база и праща имейли.
По долу е част от опростен вариант, без всички проверки, оптимизации и защити, които ползвам в продукция.
# -*- coding: utf-8 -*-
import os
import re
import time
import smtplib
import subprocess
import mysql.connector
from datetime import datetime
from email.mime.text import MIMEText
AUTH_LOG = "/var/log/auth.log"
PROCESSED_FILE = "/var/log/processed_events.log"
HOSTNAME = os.uname()[1]
DB_CFG = {
"host": "localhost",
"user": "admin_user",
"password": "dontpass",
"database": "admin_panel_db",
}
def read_last_relevant_line():
with open(AUTH_LOG, "r") as f:
lines = f.readlines()
for line in reversed(lines):
if re.search(r"(Accepted password|Failed password|Invalid user|authentication failure;)", line):
return line.strip()
return None
def parse_line(line):
ip_match = re.search(r"from ([0-9.]+)", line)
user_match = re.search(r"for (invalid user )?([a-zA-Z0-9_-]+)", line)
ip = ip_match.group(1) if ip_match else "Unknown IP"
username = user_match.group(2) if user_match else "Unknown user"
failed = bool(re.search(r"(Failed password|Invalid user|authentication failure;)", line))
return ip, username, failed
def log_to_db(ip, username, country, status, reason):
conn = mysql.connector.connect(**DB_CFG)
cur = conn.cursor()
cur.execute(
"""
INSERT INTO failed_login_attempts (ip_address, username, country, attempt_time, reason)
VALUES (%s, %s, %s, %s, %s)
""",
(ip, username, country, datetime.now(), reason),
)
conn.commit()
cur.close()
conn.close()
# ... тук има още проверки, защити и обработка на различни типове логове
В продукция има още слоеве за защита, rate limiting, криптиране на чувствителни данни и отделно управление на конфигурацията. В статията ги пропускам нарочно.
Важното тук е, че не пращам целия лог към облака, а вече обобщена информация за събитията. Това държи трафика малък и пази чувствителни детайли на място.
{
"server": "deb-mail-01",
"time_window": "2026-02-24T10:00:00Z/2026-02-24T10:15:00Z",
"events": [
{
"ip": "203.0.113.45",
"country": "CN",
"username": "root",
"service": "sshd",
"attempts": 37,
"status": "failed",
"reason": "bruteforce"
},
{
"ip": "198.51.100.77",
"country": "BG",
"username": "office",
"service": "postfix",
"attempts": 3,
"status": "failed",
"reason": "wrong_password"
}
]
}
AI моделът връща нещо подобно на това, в зависимост от платформата, която използвате:
{
"summary": "Typical SSH bruteforce from multiple networks plus a few local user mistakes.",
"risk_score": 7,
"findings": [
{
"type": "bruteforce",
"ip": "203.0.113.45",
"severity": "high",
"recommendation": "Keep blocked for at least 30 days. Consider adding to a global deny list."
},
{
"type": "user_error",
"username": "office",
"severity": "low",
"recommendation": "Ask the user to verify password manager settings."
}
]
}
Точният формат зависи от доставчика на AI услугата. Идеята е да имате някакъв вид risk score и препоръка, а не просто още един текст.
Самият скрипт не се опитва да бъде SIEM. Той просто подготвя качествени данни за AI и ги визуализира човешки в имейли и web панел. Ето опростен пример как може да се вземе AI отговор и да се реши какво да се изпрати като аларма.
def should_alert(ai_result):
if ai_result["risk_score"] >= 8:
return "critical"
if ai_result["risk_score"] >= 5:
return "warning"
return None
def color_for_country(country):
if country == "BG":
return "green"
if country in ("RU", "CN", "BR"):
return "red"
return "orange"
def build_html_email(events, ai_result):
level = should_alert(ai_result)
if not level:
return None
rows = []
for ev in events:
color = color_for_country(ev["country"])
rows.append(
f"<tr><td>{ev['ip']}</td><td style='color:{color}'>{ev['country']}</td>"
f"<td>{ev['username']}</td><td>{ev['service']}</td><td>{ev['attempts']}</td></tr>"
)
html = f"""
<h2>Security alert on {HOSTNAME}</h2>
<p>AI оценка на риска: <b>{ai_result['risk_score']}/10</b></p>
<p>Обобщение: {ai_result['summary']}</p>
<table border='1' cellspacing='0' cellpadding='4'>
<tr><th>IP</th><th>Country</th><th>User</th><th>Service</th><th>Attempts</th></tr>
{''.join(rows)}
</table>
"""
return html
Така в имейлите и в web панела IP адреси от България излизат в зелено, а типичните „интересни“ региони могат да светят в червено. AI подсказва кое е приоритетно, а не разчита само на проста проверка „брой опити над N“.
Автоматизацията има смисъл, когато накрая някой взима решение. При мен каналите са три.
AI не блокира сам услуги, не пипа firewall и не започва да забранява всичко наред. Той подава контекст и препоръка, а хората решават кога да се действа по агресивно и кога не.
В статията пропускам няколко важни части, не защото ги няма, а защото не е добра идея да се публикуват в пълен вид.
Идеята е да дадем работещ модел, който може да надградите, а не готов dump за копиране.
Пишете на office@ntg.bg или заявете безплатна консултация.