mirror of
https://github.com/facebook/proxygen.git
synced 2025-08-01 10:26:51 +03:00
Summary: Pull Request resolved: https://github.com/facebookincubator/zstrong/pull/1024 Differential Revision: D64977670 fbshipit-source-id: 505d9709aa088f7811e0f4d6d7ba26b07ffdbc94
291 lines
9.0 KiB
Python
291 lines
9.0 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
#
|
|
# This source code is licensed under the MIT license found in the
|
|
# LICENSE file in the root directory of this source tree.
|
|
|
|
# pyre-unsafe
|
|
|
|
import os
|
|
|
|
import platform
|
|
import re
|
|
import shlex
|
|
import sys
|
|
from typing import Optional, Tuple
|
|
|
|
|
|
def is_windows() -> bool:
|
|
"""Returns true if the system we are currently running on
|
|
is a Windows system"""
|
|
return sys.platform.startswith("win")
|
|
|
|
|
|
def get_linux_type() -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
|
try:
|
|
with open("/etc/os-release") as f:
|
|
data = f.read()
|
|
except EnvironmentError:
|
|
return (None, None, None)
|
|
|
|
os_vars = {}
|
|
for line in data.splitlines():
|
|
parts = line.split("=", 1)
|
|
if len(parts) != 2:
|
|
continue
|
|
key = parts[0].strip()
|
|
value_parts = shlex.split(parts[1].strip())
|
|
if not value_parts:
|
|
value = ""
|
|
else:
|
|
value = value_parts[0]
|
|
os_vars[key] = value
|
|
|
|
name = os_vars.get("NAME")
|
|
if name:
|
|
name = name.lower()
|
|
name = re.sub("linux", "", name)
|
|
name = name.strip().replace(" ", "_")
|
|
|
|
version_id = os_vars.get("VERSION_ID")
|
|
if version_id:
|
|
version_id = version_id.lower()
|
|
|
|
return "linux", name, version_id
|
|
|
|
|
|
# Ideally we'd use a common library like `psutil` to read system information,
|
|
# but getdeps can't take third-party dependencies.
|
|
|
|
|
|
def _get_available_ram_linux() -> int:
|
|
# TODO: Ideally, this function would inspect the current cgroup for any
|
|
# limits, rather than solely relying on system RAM.
|
|
|
|
meminfo_path = "/proc/meminfo"
|
|
try:
|
|
with open(meminfo_path) as f:
|
|
for line in f:
|
|
try:
|
|
key, value = line.split(":", 1)
|
|
except ValueError:
|
|
continue
|
|
suffix = " kB\n"
|
|
if key == "MemAvailable" and value.endswith(suffix):
|
|
value = value[: -len(suffix)]
|
|
try:
|
|
return int(value) // 1024
|
|
except ValueError:
|
|
continue
|
|
except OSError:
|
|
print("error opening {}".format(meminfo_path), end="", file=sys.stderr)
|
|
else:
|
|
print(
|
|
"{} had no valid MemAvailable".format(meminfo_path), end="", file=sys.stderr
|
|
)
|
|
|
|
guess = 8
|
|
print(", guessing {} GiB".format(guess), file=sys.stderr)
|
|
return guess * 1024
|
|
|
|
|
|
def _get_available_ram_macos() -> int:
|
|
import ctypes.util
|
|
|
|
libc = ctypes.CDLL(ctypes.util.find_library("libc"), use_errno=True)
|
|
sysctlbyname = libc.sysctlbyname
|
|
sysctlbyname.restype = ctypes.c_int
|
|
sysctlbyname.argtypes = [
|
|
ctypes.c_char_p,
|
|
ctypes.c_void_p,
|
|
ctypes.POINTER(ctypes.c_size_t),
|
|
ctypes.c_void_p,
|
|
ctypes.c_size_t,
|
|
]
|
|
# TODO: There may be some way to approximate an availability
|
|
# metric, but just use total RAM for now.
|
|
memsize = ctypes.c_int64()
|
|
memsizesize = ctypes.c_size_t(8)
|
|
res = sysctlbyname(
|
|
b"hw.memsize", ctypes.byref(memsize), ctypes.byref(memsizesize), None, 0
|
|
)
|
|
if res != 0:
|
|
raise NotImplementedError(
|
|
f"failed to retrieve hw.memsize sysctl: {ctypes.get_errno()}"
|
|
)
|
|
return memsize.value // (1024 * 1024)
|
|
|
|
|
|
def _get_available_ram_windows() -> int:
|
|
import ctypes
|
|
|
|
DWORD = ctypes.c_uint32
|
|
QWORD = ctypes.c_uint64
|
|
|
|
class MEMORYSTATUSEX(ctypes.Structure):
|
|
_fields_ = [
|
|
("dwLength", DWORD),
|
|
("dwMemoryLoad", DWORD),
|
|
("ullTotalPhys", QWORD),
|
|
("ullAvailPhys", QWORD),
|
|
("ullTotalPageFile", QWORD),
|
|
("ullAvailPageFile", QWORD),
|
|
("ullTotalVirtual", QWORD),
|
|
("ullAvailVirtual", QWORD),
|
|
("ullExtendedVirtual", QWORD),
|
|
]
|
|
|
|
ms = MEMORYSTATUSEX()
|
|
ms.dwLength = ctypes.sizeof(ms)
|
|
# pyre-ignore[16]
|
|
res = ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(ms))
|
|
if res == 0:
|
|
raise NotImplementedError("error calling GlobalMemoryStatusEx")
|
|
|
|
# This is fuzzy, but AvailPhys is too conservative, and AvailTotal is too
|
|
# aggressive, so average the two. It's okay for builds to use some swap.
|
|
return (ms.ullAvailPhys + ms.ullTotalPhys) // (2 * 1024 * 1024)
|
|
|
|
|
|
def _get_available_ram_freebsd() -> int:
|
|
import ctypes.util
|
|
|
|
libc = ctypes.CDLL(ctypes.util.find_library("libc"), use_errno=True)
|
|
sysctlbyname = libc.sysctlbyname
|
|
sysctlbyname.restype = ctypes.c_int
|
|
sysctlbyname.argtypes = [
|
|
ctypes.c_char_p,
|
|
ctypes.c_void_p,
|
|
ctypes.POINTER(ctypes.c_size_t),
|
|
ctypes.c_void_p,
|
|
ctypes.c_size_t,
|
|
]
|
|
# hw.usermem is pretty close to what we want.
|
|
memsize = ctypes.c_int64()
|
|
memsizesize = ctypes.c_size_t(8)
|
|
res = sysctlbyname(
|
|
b"hw.usermem", ctypes.byref(memsize), ctypes.byref(memsizesize), None, 0
|
|
)
|
|
if res != 0:
|
|
raise NotImplementedError(
|
|
f"failed to retrieve hw.memsize sysctl: {ctypes.get_errno()}"
|
|
)
|
|
return memsize.value // (1024 * 1024)
|
|
|
|
|
|
def get_available_ram() -> int:
|
|
"""
|
|
Returns a platform-appropriate available RAM metric in MiB.
|
|
"""
|
|
if sys.platform == "linux":
|
|
return _get_available_ram_linux()
|
|
elif sys.platform == "darwin":
|
|
return _get_available_ram_macos()
|
|
elif sys.platform == "win32":
|
|
return _get_available_ram_windows()
|
|
elif sys.platform.startswith("freebsd"):
|
|
return _get_available_ram_freebsd()
|
|
else:
|
|
raise NotImplementedError(
|
|
f"platform {sys.platform} does not have an implementation of get_available_ram"
|
|
)
|
|
|
|
|
|
def is_current_host_arm() -> bool:
|
|
if sys.platform.startswith("darwin"):
|
|
# platform.machine() can be fooled by rosetta for python < 3.9.2
|
|
return "ARM64" in os.uname().version
|
|
else:
|
|
machine = platform.machine().lower()
|
|
return "arm" in machine or "aarch" in machine
|
|
|
|
|
|
class HostType(object):
|
|
def __init__(self, ostype=None, distro=None, distrovers=None) -> None:
|
|
# Maybe we should allow callers to indicate whether this machine uses
|
|
# an ARM architecture, but we need to change HostType serialization
|
|
# and deserialization in that case and hunt down anywhere that is
|
|
# persisting that serialized data.
|
|
isarm = False
|
|
|
|
if ostype is None:
|
|
distro = None
|
|
distrovers = None
|
|
if sys.platform.startswith("linux"):
|
|
ostype, distro, distrovers = get_linux_type()
|
|
elif sys.platform.startswith("darwin"):
|
|
ostype = "darwin"
|
|
elif is_windows():
|
|
ostype = "windows"
|
|
distrovers = str(sys.getwindowsversion().major)
|
|
elif sys.platform.startswith("freebsd"):
|
|
ostype = "freebsd"
|
|
else:
|
|
ostype = sys.platform
|
|
|
|
isarm = is_current_host_arm()
|
|
|
|
# The operating system type
|
|
self.ostype = ostype
|
|
# The distribution, if applicable
|
|
self.distro = distro
|
|
# The OS/distro version if known
|
|
self.distrovers = distrovers
|
|
# Does the CPU use an ARM architecture? ARM includes Apple Silicon
|
|
# Macs as well as other ARM systems that might be running Linux or
|
|
# something.
|
|
self.isarm = isarm
|
|
|
|
def is_windows(self):
|
|
return self.ostype == "windows"
|
|
|
|
# is_arm is kinda half implemented at the moment. This method is only
|
|
# intended to be used when HostType represents information about the
|
|
# current machine we are running on.
|
|
# When HostType is being used to enumerate platform types (represent
|
|
# information about machine types that we may or may not be running on)
|
|
# the result could be nonsense (under the current implementation its always
|
|
# false.)
|
|
def is_arm(self):
|
|
return self.isarm
|
|
|
|
def is_darwin(self):
|
|
return self.ostype == "darwin"
|
|
|
|
def is_linux(self):
|
|
return self.ostype == "linux"
|
|
|
|
def is_freebsd(self):
|
|
return self.ostype == "freebsd"
|
|
|
|
def as_tuple_string(self) -> str:
|
|
return "%s-%s-%s" % (
|
|
self.ostype,
|
|
self.distro or "none",
|
|
self.distrovers or "none",
|
|
)
|
|
|
|
def get_package_manager(self):
|
|
if not self.is_linux() and not self.is_darwin():
|
|
return None
|
|
if self.is_darwin():
|
|
return "homebrew"
|
|
if self.distro in ("fedora", "centos", "centos_stream", "rocky"):
|
|
return "rpm"
|
|
if self.distro.startswith(("debian", "ubuntu", "pop!_os", "mint")):
|
|
return "deb"
|
|
if self.distro == "arch":
|
|
return "pacman-package"
|
|
return None
|
|
|
|
@staticmethod
|
|
def from_tuple_string(s) -> "HostType":
|
|
ostype, distro, distrovers = s.split("-")
|
|
return HostType(ostype=ostype, distro=distro, distrovers=distrovers)
|
|
|
|
def __eq__(self, b):
|
|
return (
|
|
self.ostype == b.ostype
|
|
and self.distro == b.distro
|
|
and self.distrovers == b.distrovers
|
|
)
|