fork download
  1. #
  2. # Proxmox Balance script.
  3. #
  4. # Author: Skylar Kelty
  5. #
  6.  
  7. import argparse
  8. import datetime
  9. import locket
  10. import operator
  11. import os
  12. import sys
  13. import time
  14. import yaml
  15. from proxmoxer import ProxmoxAPI
  16.  
  17.  
  18. class ProxmoxBalancer:
  19. vm_list = []
  20. config = {}
  21. node_list = {}
  22. dry = False
  23. proxmox = False
  24.  
  25. def __init__(self):
  26. # Read args.
  27. parser = argparse.ArgumentParser()
  28. parser.add_argument("-d", "--dry", action="store_true")
  29. parser.add_argument(
  30. "-c",
  31. "--config",
  32. default=os.path.dirname(os.path.abspath(__file__)) + "/../config.yaml",
  33. )
  34. args = parser.parse_args()
  35. self.dry = args.dry
  36.  
  37. if not os.path.exists(args.config):
  38. sys.stderr.write("Cannot find config file\n")
  39. sys.exit(1)
  40.  
  41. # Read config, sanitize, fire up the API.
  42. with open(args.config, "r") as stream:
  43. try:
  44. config = yaml.safe_load(stream)
  45. if "method" not in config:
  46. config["method"] = "current"
  47. if "allowed_disparity" not in config:
  48. config["allowed_disparity"] = 20
  49. if "rules" not in config:
  50. config["rules"] = {}
  51. if "async" not in config:
  52. config["async"] = True
  53. if "separate" not in config["rules"]:
  54. config["rules"]["separate"] = {}
  55. if "port" not in config:
  56. config["port"] = 8006
  57. except yaml.YAMLError as exc:
  58. print(exc)
  59. sys.exit(1)
  60.  
  61. self.config = config
  62.  
  63. if "token_name" in config and "token_secret" in config:
  64. self.proxmox = ProxmoxAPI(
  65. config["host"],
  66. port=config["port"],
  67. user=config["user"],
  68. token_name=config["token_name"],
  69. token_value=config["token_secret"],
  70. backend="https",
  71. verify_ssl=False,
  72. )
  73. else:
  74. self.proxmox = ProxmoxAPI(
  75. config["host"],
  76. port=config["port"],
  77. user=config["user"],
  78. password=config["password"],
  79. backend="https",
  80. verify_ssl=False,
  81. )
  82.  
  83. # Get various useful sum.
  84. def get_totals(self):
  85. total_disparity = 0
  86. total_nodes = len(self.node_list)
  87. total_points = sum([self.node_list[node]["points"] for node in self.node_list])
  88. total_used_points = sum(
  89. [self.node_list[node]["used_points"] for node in self.node_list]
  90. )
  91. avg_points = (total_used_points / total_nodes) + 0.0
  92. return (
  93. total_disparity,
  94. total_nodes,
  95. total_points,
  96. total_used_points,
  97. avg_points,
  98. )
  99.  
  100. # Calculate the overall imbalance in the cluster, this can be useful for
  101. # determining if we should even run Balance.
  102. def calculate_imbalance(self):
  103. # Work out total imbalance as a percentage
  104. (
  105. total_disparity,
  106. total_nodes,
  107. total_points,
  108. total_used_points,
  109. avg_points,
  110. ) = self.get_totals()
  111. for node in self.node_list:
  112. points = self.node_list[node]["used_points"]
  113. total_disparity += abs(avg_points - points)
  114. disparity = abs(100 - ((points / avg_points) * 100))
  115. if disparity > 30:
  116. print("Found imbalance in node %s (%i" % (node, disparity) + "%)")
  117.  
  118. return total_disparity
  119.  
  120. # Work out the best host for a given VM.
  121. def calculate_best_host(self, current_node, vm_name):
  122. # List of vms to keep separate.
  123. rules = self.config["rules"]
  124. separate = [rule.split(",") for rule in rules["separate"]]
  125. unite = [rule.split(",") for rule in rules["unite"]]
  126.  
  127. # Get points.
  128. vm = self.node_list[current_node]["vms"][vm_name]
  129. points = vm["points"]
  130.  
  131. # Begin calculations.
  132. (
  133. total_disparity,
  134. total_nodes,
  135. total_points,
  136. total_used_points,
  137. avg_points,
  138. ) = self.get_totals()
  139. new_host = False
  140. new_host_points = 0
  141. for node_name in self.node_list:
  142. if node_name == current_node:
  143. continue
  144.  
  145. # Make sure we abide by the rules.
  146. skip = False
  147. for rule in separate:
  148. if vm_name in rule:
  149. for vm in rule:
  150. if vm != vm_name and vm in self.node_list[node_name]["vms"]:
  151. skip = True
  152. for rule in unite:
  153. if vm_name in rule:
  154. for vm in rule:
  155. if vm != vm_name and vm not in self.node_list[node_name]["vms"]:
  156. skip = True
  157. if skip:
  158. continue
  159.  
  160. # This is not particularly forward-thinking but it will do for now.
  161. new_points = self.node_list[node_name]["used_points"] + points
  162. if new_points < self.node_list[current_node]["used_points"] and (
  163. new_points < new_host_points or new_host_points == 0
  164. ):
  165. new_host = node_name
  166. new_host_points = new_points
  167. return new_host
  168.  
  169. def get_rule(self, vm_name):
  170. rules = self.config["rules"]
  171.  
  172. # First, check if we are pinned to a host.
  173. if "pin" in rules:
  174. pinned = [rule.split(":") for rule in rules["pin"]]
  175. for rule in pinned:
  176. if vm_name == rule[0]:
  177. return {"type": "pinned", "node": rule[1]}
  178.  
  179. # Now, see if we are separated from other VMs.
  180. if "separate" in rules:
  181. separate = [rule.split(",") for rule in rules["separate"]]
  182. for rule in separate:
  183. for vm in rule:
  184. if vm == vm_name:
  185. return {"type": "separate", "rule": rule}
  186.  
  187. # Should we unite with another vm?
  188. if "unite" in rules:
  189. unite = [rule.split(",") for rule in rules["unite"]]
  190. for rule in unite:
  191. for vm in rule:
  192. if vm == vm_name:
  193. return {"type": "unite", "rule": rule}
  194.  
  195. return {}
  196.  
  197. # Is this host pinned?
  198. def is_pinned(self, vm_name):
  199. rule = self.get_rule(vm_name)
  200. return "type" in rule and rule["type"] == "pinned"
  201.  
  202. # Should we separate this VM out from its current host?
  203. def should_separate(self, rule, vm_name, node_vms):
  204. other_vms = [x for x in rule if x != vm_name]
  205. return any(item in other_vms for item in node_vms)
  206.  
  207. # Should we unite this VM with friends?
  208. def should_unite(self, rule, vm_name, node_vms):
  209. other_vms = [x for x in rule if x != vm_name]
  210. return not all(item in node_vms for item in other_vms)
  211.  
  212. # Given a list of candiate hosts, pick the one with the lowest score.
  213. def get_lowest_candidate(self, candidates):
  214. lowest_point_score = 0
  215. candidate_host = 0
  216.  
  217. # Pick the candidate with the lowest point score.
  218. for candidate in candidates:
  219. if candidate_host == 0:
  220. candidate_host = candidate
  221. lowest_point_score = self.node_list[candidate]["points"]
  222. if self.node_list[candidate]["points"] > lowest_point_score:
  223. candidate_host = candidate
  224. lowest_point_score = self.node_list[candidate]["points"]
  225.  
  226. return candidate_host
  227.  
  228. # Keep united VMs together at all costs.
  229. def unite(self, rule, vm_name):
  230. rule_vms = [x for x in rule]
  231. candidates = [
  232. x
  233. for x in self.node_list
  234. if any(item in rule_vms for item in self.node_list[x]["vms"])
  235. ]
  236. return self.get_lowest_candidate(candidates)
  237.  
  238. # Keep separated VMs apart at all costs.
  239. def separate(self, rule, vm_name):
  240. other_vms = [x for x in rule if x != vm_name]
  241. candidates = [
  242. x
  243. for x in self.node_list
  244. if not any(item in other_vms for item in self.node_list[x]["vms"])
  245. ]
  246. if len(candidates) <= 0:
  247. print(
  248. "No suitable candidate host found for %s, perhaps you need more hosts."
  249. % vm_name
  250. )
  251. return self.get_lowest_candidate(candidates)
  252.  
  253. # Runs a balance pass over the node list.
  254. def rule_pass(self):
  255. operations = []
  256.  
  257. # Loop through every VM, check for rule violations.
  258. for node_name in self.node_list:
  259. for vm_name in self.node_list[node_name]["vms"]:
  260. # First, check we're abiding by the rules.
  261. rule = self.get_rule(vm_name)
  262. if "type" not in rule:
  263. continue
  264.  
  265. target = False
  266.  
  267. # Deal with unite rules.
  268. if rule["type"] == "unite" and self.should_unite(
  269. rule["rule"], vm_name, self.node_list[node_name]["vms"]
  270. ):
  271. print("Rule violation detected for '%s': Unite violation" % vm_name)
  272. target = self.unite(rule["rule"], vm_name)
  273.  
  274. # Deal with separation rules.
  275. if rule["type"] == "separate" and self.should_separate(
  276. rule["rule"], vm_name, self.node_list[node_name]["vms"]
  277. ):
  278. print(
  279. "Rule violation detected for '%s': Separation violation"
  280. % vm_name
  281. )
  282. target = self.separate(rule["rule"], vm_name)
  283.  
  284. # Deal with pinning rules.
  285. if rule["type"] == "pinned" and rule["node"] != node_name:
  286. print(
  287. "Rule violation detected for '%s': supposed to be pinned to host '%s'."
  288. % (vm_name, rule["node"])
  289. )
  290. if rule["node"] in self.node_list:
  291. target = rule["node"]
  292. else:
  293. print(" - Cannot enforce rule: node not in list")
  294.  
  295. # If we have to move, do.
  296. if target and target != node_name:
  297. operations.append(
  298. {"vm_name": vm_name, "host": node_name, "target": target}
  299. )
  300.  
  301. self.node_list[target]["vms"][vm_name] = self.node_list[node_name][
  302. "vms"
  303. ][vm_name]
  304.  
  305. return operations
  306.  
  307. # Runs a balance pass over the node list.
  308. def balance_pass(self):
  309. operations = []
  310.  
  311. # Loop through every VM, if we find one that we can migrate to another host without
  312. # making that hosts' total points greater than our own, do that.
  313. for node_name in self.node_list:
  314. for vm_name in self.node_list[node_name]["vms"]:
  315. vm = self.node_list[node_name]["vms"][vm_name]
  316.  
  317. # Can we action this host?
  318. if vm["status"] == "stopped" or self.is_pinned(vm_name):
  319. continue
  320.  
  321. points = vm["points"]
  322. target = self.calculate_best_host(node_name, vm_name)
  323.  
  324. if target:
  325. operations.append(
  326. {"vm_name": vm_name, "host": node_name, "target": target}
  327. )
  328.  
  329. self.node_list[node_name]["used_points"] -= points
  330. self.node_list[target]["used_points"] += points
  331.  
  332. return operations
  333.  
  334. # Return the status of a given task.
  335. def task_status(self, host, taskid):
  336. task = self.proxmox.nodes(host).tasks(taskid).status.get()
  337. if task and "status" in task:
  338. return task["status"]
  339. return "Unknown Task"
  340.  
  341. # Wait for a given to task to complete (or fail).
  342. def wait_for_task(self, host, taskid):
  343. while self.task_status(host, taskid) == "running":
  344. time.sleep(1)
  345.  
  346. # Actually migrate a VM.
  347. def run_migrate(self, operation, wait=False):
  348. vm_name = operation["vm_name"]
  349. host = operation["host"]
  350. target = operation["target"]
  351. vmid = self.node_list[host]["vms"][vm_name]["vmid"]
  352. data = {
  353. "target": target,
  354. "online": 1,
  355. }
  356. if not self.dry:
  357. print("Moving %s from %s to %s" % (vm_name, host, target))
  358. taskid = self.proxmox.nodes(host).qemu(vmid).migrate.post(**data)
  359. if wait:
  360. self.wait_for_task(host, taskid)
  361. else:
  362. print("Would move %s from %s to %s" % (vm_name, host, target))
  363.  
  364. # Pretty print the points used.
  365. def pretty_print_points(self):
  366. for name in self.node_list:
  367. node = self.node_list[name]
  368. print(
  369. "Found host %s with %i points (%i used)."
  370. % (name, node["points"], node["used_points"])
  371. )
  372.  
  373. # Calculate points for a given VM.
  374. # We're going to assign points to each server and VM based on CPU/RAM requirements.
  375. # Each CPU core is worth 5 points, each GB ram is 1 point.
  376. def calculate_vm_points(self, vm):
  377. if self.config["method"] == "max":
  378. return (vm["maxcpu"] * 5) + ((vm["maxmem"] / 1024 / 1024 / 1024) * 1)
  379. return (vm["cpu"] * 5) + ((vm["mem"] / 1024 / 1024 / 1024) * 1)
  380.  
  381. # Generate node_list and vm_list.
  382. def regenerate_lists(self):
  383. for node in self.proxmox.nodes.get():
  384. node_name = node["node"]
  385.  
  386. self.node_list[node_name] = node
  387. self.node_list[node_name]["vms"] = {}
  388.  
  389. # Calculate points.
  390. points = (node["maxcpu"] * 5) + ((node["maxmem"] / 1024 / 1024 / 1024) * 1)
  391. self.node_list[node_name]["points"] = points
  392. self.node_list[node_name]["used_points"] = 0
  393.  
  394. for vm in self.proxmox.nodes(node_name).qemu.get():
  395. vm_name = vm["name"]
  396. if vm["status"] == "running":
  397. points = self.calculate_vm_points(vm)
  398. self.node_list[node_name]["vms"][vm_name] = vm
  399. self.node_list[node_name]["vms"][vm_name]["points"] = points
  400. self.node_list[node_name]["used_points"] += points
  401. self.vm_list.append(
  402. {
  403. "obj": vm,
  404. "node": node_name,
  405. "points": points,
  406. }
  407. )
  408.  
  409. # Order vm_list.
  410. self.vm_list.sort(key=operator.itemgetter("points"))
  411. self.vm_list.reverse()
  412.  
  413. def balance(self):
  414. with locket.lock_file(self.config["infra_lock_file"], timeout=120):
  415. # First get the current list of hosts and VMs.
  416. self.regenerate_lists()
  417.  
  418. print("Running rule checks%s..." % (" (dry mode)" if self.dry else ""))
  419.  
  420. # Fix rule violations, then balance.
  421. operations = self.rule_pass()
  422. for operation in operations:
  423. self.run_migrate(operation, wait=True)
  424.  
  425. # Get a new list of hosts and VMs.
  426. self.regenerate_lists()
  427.  
  428. # Okay, work out the imbalance here and run migrations.
  429. total_disparity = self.calculate_imbalance()
  430. if total_disparity > (
  431. len(self.node_list) * self.config["allowed_disparity"]
  432. ):
  433. print("Running balance%s..." % (" (dry mode)" if self.dry else ""))
  434.  
  435. # Now, we need to spread the load.
  436. # We're going to work out how to best spread out with the minimal number of migrations.
  437. self.pretty_print_points()
  438.  
  439. # Okay, this is not optimal. When we get more than the hour I've given myself for this we
  440. # can use some fancy balancing graph, but for now, we will just move a few things to try and balance it.
  441. operations = self.balance_pass()
  442. for operation in operations:
  443. self.run_migrate(operation, wait=(not self.config["async"]))
  444.  
  445. # Now, we need to spread the load.
  446. # We're going to work out how to best spread out with the minimal number of migrations.
  447. self.pretty_print_points()
  448. else:
  449. print("Acceptable overall imbalance, not running balance.")# your code goes here
Runtime error #stdin #stdout #stderr 0.15s 26188KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
Traceback (most recent call last):
  File "./prog.py", line 9, in <module>
ModuleNotFoundError: No module named 'locket'