[ARTICLE] 加强版Python监控脚本,聚合用户内存使用并支持Bark推送

// 2025-11-27

作为服务器管理员,你是不是也经常碰到这种糟心事:半夜收到内存告警,登录上去想看看是哪个小天才在搞鬼,结果用 tophtop 一看,茫茫多的进程列表,根本不知道从何查起。特别是当某个用户开了几十上百个小进程,想手动把它们的内存加起来,简直是噩梦。

为了从这种重复的抓狂中解脱出来,我决定写一个"加强版"的Python监控脚本。我希望它能帮我实现下面这几个小目标:

经过一番折腾,下面就是我的最终成果,分享给你,希望能帮你摆脱"内存刺客"。

第一步:把主角"监控脚本"放对位置

首先,我们需要把核心的Python脚本文件放到服务器上。我习惯放在 /usr/local/bin 目录下,方便管理。你可以用 nano 或者你更顺手的 vim 来创建它:

sudo nano /usr/local/bin/mem_watch_bark.py

然后,把下面这段代码完整地粘贴进去。唯一需要你动手改的,就是把 BARK_KEY 换成你自己的 Bark 设备 Key,不然脚本喊破喉咙你也收不到通知。

#!/usr/bin/env python3
import os
import pwd
import json
import socket
import time
from urllib.parse import urlencode
from urllib.request import Request, urlopen

# ========= 配置区 =========
BARK_KEY = "在这里填你的 Bark 设备 key"  # 必填!
BARK_SERVER = "https://api.day.app"      # 自建 bark-server 就改这里
THRESHOLD = 80                           # 总内存使用率阈值(百分比)
STATE_FILE = "/var/tmp/mem_watch_bark_state.json"

TOP_USERS = 5                            # 推送中展示的用户 Top N
TOP_GROUPS = 5                           # 用户+命令组合 Top N
TOP_PROCS_PER_GROUP = 2                  # 每个组合展示几个最大进程
MIN_ALERT_INTERVAL_SEC = 600             # 两次告警之间的最小间隔(秒)

def read_meminfo():
    """从 /proc/meminfo 读取总内存、可用内存和 swap 信息,单位 kB。"""
    meminfo = {}
    with open("/proc/meminfo") as f:
        for line in f:
            parts = line.split()
            key = parts[0].rstrip(":")
            try:
                value = int(parts[1])
            except (IndexError, ValueError):
                continue
            meminfo[key] = value

    total_kb = meminfo["MemTotal"]
    avail_kb = meminfo.get(
        "MemAvailable",
        meminfo.get("MemFree", 0)
        + meminfo.get("Buffers", 0)
        + meminfo.get("Cached", 0),
    )
    used_effective_kb = total_kb - avail_kb
    percent = int(used_effective_kb * 100 / total_kb)

    swap_total_kb = meminfo.get("SwapTotal", 0)
    swap_free_kb = meminfo.get("SwapFree", 0)
    swap_used_kb = max(swap_total_kb - swap_free_kb, 0)
    swap_percent = int(swap_used_kb * 100 / swap_total_kb) if swap_total_kb > 0 else 0

    return (
        total_kb, used_effective_kb, avail_kb, percent,
        swap_total_kb, swap_used_kb, swap_percent,
    )

def collect_stats():
    """遍历 /proc,按 用户 和 用户+命令 聚合 RSS。"""
    users = {}
    groups = {}
    proc_root = "/proc"

    for entry in os.listdir(proc_root):
        if not entry.isdigit():
            continue
        pid = entry
        status_path = os.path.join(proc_root, pid, "status")

        try:
            with open(status_path) as f:
                uid = None
                rss_kb = 0
                name = None
                for line in f:
                    if line.startswith("Name:"):
                        name = line.split(":", 1)[1].strip()
                    elif line.startswith("Uid:"):
                        parts = line.split()
                        if len(parts) >= 2:
                            uid = int(parts[1])
                    elif line.startswith("VmRSS:"):
                        parts = line.split()
                        if len(parts) >= 2:
                            rss_kb = int(parts[1])
                if uid is None or rss_kb <= 0:
                    continue
        except (FileNotFoundError, ProcessLookupError, PermissionError):
            continue

        try:
            username = pwd.getpwuid(uid).pw_name
        except KeyError:
            username = f"uid{uid}"

        cmd = name or ""
        cmdline_path = os.path.join(proc_root, pid, "cmdline")
        try:
            with open(cmdline_path, "rb") as f:
                raw = f.read().replace(b"\x00", b" ").strip()
                if raw:
                    cmd = raw.decode("utf-8", "ignore")
        except (FileNotFoundError, ProcessLookupError, PermissionError):
            pass

        base_cmd = cmd.split()[0] if cmd.split() else (name or "unknown")

        info_u = users.setdefault(uid, {"username": username, "rss_kb": 0, "procs": []})
        info_u["rss_kb"] += rss_kb
        info_u["procs"].append((rss_kb, int(pid), cmd[:160]))

        key = (uid, base_cmd)
        info_g = groups.setdefault(key, {"username": username, "base_cmd": base_cmd, "rss_kb": 0, "procs": []})
        info_g["rss_kb"] += rss_kb
        info_g["procs"].append((rss_kb, int(pid), cmd[:160]))

    return users, groups

def read_loadavg():
    try:
        with open("/proc/loadavg") as f:
            parts = f.read().split()
        if len(parts) >= 3:
            return float(parts[0]), float(parts[1]), float(parts[2])
    except Exception:
        pass
    return None, None, None

def load_state():
    try:
        with open(STATE_FILE) as f:
            data = json.load(f)
        return int(data.get("prev_percent", 0)), int(data.get("consec_high", 0)), float(data.get("last_alert_ts", 0.0))
    except Exception:
        return 0, 0, 0.0

def save_state(percent, consec_high, last_alert_ts):
    try:
        os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
        with open(STATE_FILE, "w") as f:
            json.dump({"prev_percent": int(percent), "consec_high": int(consec_high), "last_alert_ts": float(last_alert_ts)}, f)
    except Exception:
        pass

def send_bark(title, body):
    if not BARK_KEY:
        return
    url = f"{BARK_SERVER.rstrip('/')}/{BARK_KEY}/"
    data = urlencode({"title": title, "body": body}).encode("utf-8")
    req = Request(url, data=data)
    try:
        with urlopen(req, timeout=5) as resp:
            resp.read()
    except Exception:
        pass

def main():
    now = time.time()
    (total_kb, used_kb, avail_kb, percent, swap_total_kb, swap_used_kb, swap_percent) = read_meminfo()
    prev_percent, consec_high, last_alert_ts = load_state()

    if percent >= THRESHOLD:
        consec_high += 1
    else:
        consec_high = 0

    should_alert = False
    if percent >= THRESHOLD and consec_high >= 2:
        if now - last_alert_ts >= MIN_ALERT_INTERVAL_SEC:
            should_alert = True
            last_alert_ts = now

    save_state(percent, consec_high, last_alert_ts)

    if not should_alert:
        return

    users, groups = collect_stats()
    hostname = socket.gethostname()

    total_gib = total_kb / 1024 / 1024
    used_gib = used_kb / 1024 / 1024
    avail_gib = avail_kb / 1024 / 1024
    swap_total_gib = swap_total_kb / 1024 / 1024
    swap_used_gib = swap_used_kb / 1024 / 1024

    la1, la5, la15 = read_loadavg()

    lines = [f"主机: {hostname}", f"有效内存使用: {used_gib:.1f} GiB / {total_gib:.1f} GiB ({percent}%)", f"剩余可用: {avail_gib:.1f} GiB"]
    lines.append(f"Swap 使用: {swap_used_gib:.1f} GiB / {swap_total_gib:.1f} GiB ({swap_percent}%)" if swap_total_kb > 0 else "Swap: 未启用或为 0")
    if la1 is not None:
        lines.append(f"LoadAvg: {la1:.2f}, {la5:.2f}, {la15:.2f}")
    lines.append("")

    user_list = sorted(users.items(), key=lambda kv: kv[1]["rss_kb"], reverse=True)
    lines.append("按用户内存占用 Top:")
    for i, (uid, info) in enumerate(user_list[:TOP_USERS], start=1):
        rss_gib = info["rss_kb"] / 1024 / 1024
        user_percent = info["rss_kb"] * 100.0 / total_kb
        lines.append(f"{i}. {info['username']} (UID {uid}): {rss_gib:.1f} GiB (~{user_percent:.1f}% 总内存)")

    lines.append("")
    group_list = sorted(groups.items(), key=lambda kv: kv[1]["rss_kb"], reverse=True)
    lines.append("按 用户+命令 组合 Top:")
    for i, ((uid, base_cmd), info) in enumerate(group_list[:TOP_GROUPS], start=1):
        rss_gib = info["rss_kb"] / 1024 / 1024
        user_percent = info["rss_kb"] * 100.0 / total_kb
        lines.append(f"{i}. {info['username']} (UID {uid}), {base_cmd}: {rss_gib:.1f} GiB (~{user_percent:.1f}% 总内存)")
        procs = sorted(info["procs"], key=lambda x: x[0], reverse=True)
        for rss_kb, pid, cmd in procs[:TOP_PROCS_PER_GROUP]:
            proc_gib = rss_kb / 1024 / 1024
            lines.append(f"   - PID {pid}: {proc_gib:.2f} GiB | {cmd}")

    body = "\n".join(lines)
    title = f"内存告警: {hostname} {percent}% (连续 {consec_high} 次 >= {THRESHOLD}%)"
    send_bark(title, body)

if __name__ == "__main__":
    main()

你也可以通过 wgetcurl 直接下载到这个脚本:

wget https://raw.githubusercontent.com/xuzhougeng/xuzhougeng/refs/heads/main/blog/2025/11/27/mem_watch_bark.py
# 或者
curl -o /usr/local/bin/mem_watch_bark.py https://raw.githubusercontent.com/xuzhougeng/xuzhougeng/refs/heads/main/blog/2025/11/27/mem_watch_bark.py

保存退出后,别忘了给它加上执行权限,不然它跑不起来:

sudo chmod +x /usr/local/bin/mem_watch_bark.py

第二步:让脚本"活"起来

脚本自己不会跑,我们得找个监工。Linux 上最好的监工莫过于 cron 了。我们用它来设置一个定时任务,每分钟"叫醒"一次我们的脚本,实现持续监控。

编辑 root 用户的 crontab:

sudo crontab -e

然后在文件的最后加上这一行,告诉 cron 每分钟都去运行一次我们的脚本:

* * * * * /usr/bin/python3 /usr/local/bin/mem_watch_bark.py

一个小提示:如果你的 python3 不在 /usr/bin/python3,可以先用 which python3 命令找到它的藏身之处,然后换成正确的路径。

第三步:演练一次,确保万无一失

在正式投入使用前,我强烈建议你先跟它"彩排"一次,看看它会不会按你的预期工作。

  1. 先把脚本里的告警阈值调得特别低,比如 THRESHOLD = 1,这样随便一跑就会超标。
  2. 然后,在命令行手动把它运行两次:
    sudo /usr/bin/python3 /usr/local/bin/mem_watch_bark.py
    sudo /usr/bin/python3 /usr/local/bin/mem_watch_bark.py

    因为我们的逻辑是"连续2次"超标才告警,所以当你运行第二次后,你的手机就应该会"叮"一下收到 Bark 的推送了。

  3. 收到通知,确认一切正常后,千万记得把阈值改回 80 或者一个你觉得安全的位置!

大功告成,回顾一下

现在,我们这个小哨兵就正式上岗了。回顾一下,它是不是刚好解决了我们开头提到的那些痛点?

希望这个小工具能让你在管理服务器时,也能多一分从容和优雅。

RETURN_TO_BLOG_INDEX