作为服务器管理员,你是不是也经常碰到这种糟心事:半夜收到内存告警,登录上去想看看是哪个小天才在搞鬼,结果用 top 或 htop 一看,茫茫多的进程列表,根本不知道从何查起。特别是当某个用户开了几十上百个小进程,想手动把它们的内存加起来,简直是噩梦。
为了从这种重复的抓狂中解脱出来,我决定写一个"加强版"的Python监控脚本。我希望它能帮我实现下面这几个小目标:
- ✅ 按「用户 + 命令」来聚合内存,一眼就能看出是哪个用户跑的哪个程序把内存吃光了。
- ✅ 别太敏感,只有当总内存使用率连续几分钟都超过80%才告警,避免被瞬间的内存抖动疯狂打扰。
- ✅ 报告要详细,用 Bark 推送到我手机上的报告,要写清楚是哪几个"用户+命令"组合在作怪,再顺便告诉我现在的 Swap 和系统负载情况。
经过一番折腾,下面就是我的最终成果,分享给你,希望能帮你摆脱"内存刺客"。
第一步:把主角"监控脚本"放对位置
首先,我们需要把核心的Python脚本文件放到服务器上。我习惯放在 /usr/local/bin 目录下,方便管理。你可以用 nano 或者你更顺手的 vim 来创建它:
sudo nano /usr/local/bin/mem_watch_bark.py然后,把下面这段代码完整地粘贴进去。唯一需要你动手改的,就是把 BARK_KEY 换成你自己的 Bark 设备 Key,不然脚本喊破喉咙你也收不到通知。
#!/usr/bin/env python3
import os
import pwd
import json
import socket
import time
from urllib.parse import urlencode
from urllib.request import Request, urlopen
# ========= 配置区 =========
BARK_KEY = "在这里填你的 Bark 设备 key" # 必填!
BARK_SERVER = "https://api.day.app" # 自建 bark-server 就改这里
THRESHOLD = 80 # 总内存使用率阈值(百分比)
STATE_FILE = "/var/tmp/mem_watch_bark_state.json"
TOP_USERS = 5 # 推送中展示的用户 Top N
TOP_GROUPS = 5 # 用户+命令组合 Top N
TOP_PROCS_PER_GROUP = 2 # 每个组合展示几个最大进程
MIN_ALERT_INTERVAL_SEC = 600 # 两次告警之间的最小间隔(秒)
def read_meminfo():
"""从 /proc/meminfo 读取总内存、可用内存和 swap 信息,单位 kB。"""
meminfo = {}
with open("/proc/meminfo") as f:
for line in f:
parts = line.split()
key = parts[0].rstrip(":")
try:
value = int(parts[1])
except (IndexError, ValueError):
continue
meminfo[key] = value
total_kb = meminfo["MemTotal"]
avail_kb = meminfo.get(
"MemAvailable",
meminfo.get("MemFree", 0)
+ meminfo.get("Buffers", 0)
+ meminfo.get("Cached", 0),
)
used_effective_kb = total_kb - avail_kb
percent = int(used_effective_kb * 100 / total_kb)
swap_total_kb = meminfo.get("SwapTotal", 0)
swap_free_kb = meminfo.get("SwapFree", 0)
swap_used_kb = max(swap_total_kb - swap_free_kb, 0)
swap_percent = int(swap_used_kb * 100 / swap_total_kb) if swap_total_kb > 0 else 0
return (
total_kb, used_effective_kb, avail_kb, percent,
swap_total_kb, swap_used_kb, swap_percent,
)
def collect_stats():
"""遍历 /proc,按 用户 和 用户+命令 聚合 RSS。"""
users = {}
groups = {}
proc_root = "/proc"
for entry in os.listdir(proc_root):
if not entry.isdigit():
continue
pid = entry
status_path = os.path.join(proc_root, pid, "status")
try:
with open(status_path) as f:
uid = None
rss_kb = 0
name = None
for line in f:
if line.startswith("Name:"):
name = line.split(":", 1)[1].strip()
elif line.startswith("Uid:"):
parts = line.split()
if len(parts) >= 2:
uid = int(parts[1])
elif line.startswith("VmRSS:"):
parts = line.split()
if len(parts) >= 2:
rss_kb = int(parts[1])
if uid is None or rss_kb <= 0:
continue
except (FileNotFoundError, ProcessLookupError, PermissionError):
continue
try:
username = pwd.getpwuid(uid).pw_name
except KeyError:
username = f"uid{uid}"
cmd = name or ""
cmdline_path = os.path.join(proc_root, pid, "cmdline")
try:
with open(cmdline_path, "rb") as f:
raw = f.read().replace(b"\x00", b" ").strip()
if raw:
cmd = raw.decode("utf-8", "ignore")
except (FileNotFoundError, ProcessLookupError, PermissionError):
pass
base_cmd = cmd.split()[0] if cmd.split() else (name or "unknown")
info_u = users.setdefault(uid, {"username": username, "rss_kb": 0, "procs": []})
info_u["rss_kb"] += rss_kb
info_u["procs"].append((rss_kb, int(pid), cmd[:160]))
key = (uid, base_cmd)
info_g = groups.setdefault(key, {"username": username, "base_cmd": base_cmd, "rss_kb": 0, "procs": []})
info_g["rss_kb"] += rss_kb
info_g["procs"].append((rss_kb, int(pid), cmd[:160]))
return users, groups
def read_loadavg():
try:
with open("/proc/loadavg") as f:
parts = f.read().split()
if len(parts) >= 3:
return float(parts[0]), float(parts[1]), float(parts[2])
except Exception:
pass
return None, None, None
def load_state():
try:
with open(STATE_FILE) as f:
data = json.load(f)
return int(data.get("prev_percent", 0)), int(data.get("consec_high", 0)), float(data.get("last_alert_ts", 0.0))
except Exception:
return 0, 0, 0.0
def save_state(percent, consec_high, last_alert_ts):
try:
os.makedirs(os.path.dirname(STATE_FILE), exist_ok=True)
with open(STATE_FILE, "w") as f:
json.dump({"prev_percent": int(percent), "consec_high": int(consec_high), "last_alert_ts": float(last_alert_ts)}, f)
except Exception:
pass
def send_bark(title, body):
if not BARK_KEY:
return
url = f"{BARK_SERVER.rstrip('/')}/{BARK_KEY}/"
data = urlencode({"title": title, "body": body}).encode("utf-8")
req = Request(url, data=data)
try:
with urlopen(req, timeout=5) as resp:
resp.read()
except Exception:
pass
def main():
now = time.time()
(total_kb, used_kb, avail_kb, percent, swap_total_kb, swap_used_kb, swap_percent) = read_meminfo()
prev_percent, consec_high, last_alert_ts = load_state()
if percent >= THRESHOLD:
consec_high += 1
else:
consec_high = 0
should_alert = False
if percent >= THRESHOLD and consec_high >= 2:
if now - last_alert_ts >= MIN_ALERT_INTERVAL_SEC:
should_alert = True
last_alert_ts = now
save_state(percent, consec_high, last_alert_ts)
if not should_alert:
return
users, groups = collect_stats()
hostname = socket.gethostname()
total_gib = total_kb / 1024 / 1024
used_gib = used_kb / 1024 / 1024
avail_gib = avail_kb / 1024 / 1024
swap_total_gib = swap_total_kb / 1024 / 1024
swap_used_gib = swap_used_kb / 1024 / 1024
la1, la5, la15 = read_loadavg()
lines = [f"主机: {hostname}", f"有效内存使用: {used_gib:.1f} GiB / {total_gib:.1f} GiB ({percent}%)", f"剩余可用: {avail_gib:.1f} GiB"]
lines.append(f"Swap 使用: {swap_used_gib:.1f} GiB / {swap_total_gib:.1f} GiB ({swap_percent}%)" if swap_total_kb > 0 else "Swap: 未启用或为 0")
if la1 is not None:
lines.append(f"LoadAvg: {la1:.2f}, {la5:.2f}, {la15:.2f}")
lines.append("")
user_list = sorted(users.items(), key=lambda kv: kv[1]["rss_kb"], reverse=True)
lines.append("按用户内存占用 Top:")
for i, (uid, info) in enumerate(user_list[:TOP_USERS], start=1):
rss_gib = info["rss_kb"] / 1024 / 1024
user_percent = info["rss_kb"] * 100.0 / total_kb
lines.append(f"{i}. {info['username']} (UID {uid}): {rss_gib:.1f} GiB (~{user_percent:.1f}% 总内存)")
lines.append("")
group_list = sorted(groups.items(), key=lambda kv: kv[1]["rss_kb"], reverse=True)
lines.append("按 用户+命令 组合 Top:")
for i, ((uid, base_cmd), info) in enumerate(group_list[:TOP_GROUPS], start=1):
rss_gib = info["rss_kb"] / 1024 / 1024
user_percent = info["rss_kb"] * 100.0 / total_kb
lines.append(f"{i}. {info['username']} (UID {uid}), {base_cmd}: {rss_gib:.1f} GiB (~{user_percent:.1f}% 总内存)")
procs = sorted(info["procs"], key=lambda x: x[0], reverse=True)
for rss_kb, pid, cmd in procs[:TOP_PROCS_PER_GROUP]:
proc_gib = rss_kb / 1024 / 1024
lines.append(f" - PID {pid}: {proc_gib:.2f} GiB | {cmd}")
body = "\n".join(lines)
title = f"内存告警: {hostname} {percent}% (连续 {consec_high} 次 >= {THRESHOLD}%)"
send_bark(title, body)
if __name__ == "__main__":
main()你也可以通过 wget 或 curl 直接下载到这个脚本:
wget https://raw.githubusercontent.com/xuzhougeng/xuzhougeng/refs/heads/main/blog/2025/11/27/mem_watch_bark.py
# 或者
curl -o /usr/local/bin/mem_watch_bark.py https://raw.githubusercontent.com/xuzhougeng/xuzhougeng/refs/heads/main/blog/2025/11/27/mem_watch_bark.py保存退出后,别忘了给它加上执行权限,不然它跑不起来:
sudo chmod +x /usr/local/bin/mem_watch_bark.py第二步:让脚本"活"起来
脚本自己不会跑,我们得找个监工。Linux 上最好的监工莫过于 cron 了。我们用它来设置一个定时任务,每分钟"叫醒"一次我们的脚本,实现持续监控。
编辑 root 用户的 crontab:
sudo crontab -e然后在文件的最后加上这一行,告诉 cron 每分钟都去运行一次我们的脚本:
* * * * * /usr/bin/python3 /usr/local/bin/mem_watch_bark.py一个小提示:如果你的
python3不在/usr/bin/python3,可以先用which python3命令找到它的藏身之处,然后换成正确的路径。
第三步:演练一次,确保万无一失
在正式投入使用前,我强烈建议你先跟它"彩排"一次,看看它会不会按你的预期工作。
- 先把脚本里的告警阈值调得特别低,比如
THRESHOLD = 1,这样随便一跑就会超标。 - 然后,在命令行手动把它运行两次:
sudo /usr/bin/python3 /usr/local/bin/mem_watch_bark.py sudo /usr/bin/python3 /usr/local/bin/mem_watch_bark.py因为我们的逻辑是"连续2次"超标才告警,所以当你运行第二次后,你的手机就应该会"叮"一下收到 Bark 的推送了。
- 收到通知,确认一切正常后,千万记得把阈值改回
80或者一个你觉得安全的位置!
大功告成,回顾一下
现在,我们这个小哨兵就正式上岗了。回顾一下,它是不是刚好解决了我们开头提到的那些痛点?
- 精准定位:不再是面对一大堆进程干瞪眼,它直接告诉你"用户A的Python脚本"或者"用户B的R语言程序"是罪魁祸首。
- 告警不"狼来了":只有当内存连续几分钟持续爆炸时,它才会来找你,避免了因为瞬间波动带来的无效骚扰。
- 信息全面:推送来的小报告图文并茂(开玩笑的,是文字详实),内存、Swap、负载……关键信息一应俱全,让你对服务器状态心中有数。
- 不刷屏,很清净:就算服务器内存一直很高,它也很懂事地每10分钟最多提醒你一次,给你留出足够的时间去从容处理。
希望这个小工具能让你在管理服务器时,也能多一分从容和优雅。
RETURN_TO_BLOG_INDEX