Source code for platypush.plugins.system

import socket

from datetime import datetime
from typing import Union, List, Optional, Dict

from platypush.message.response.system import CpuInfoResponse, CpuTimesResponse, CpuResponseList, CpuStatsResponse, \
    CpuFrequencyResponse, VirtualMemoryUsageResponse, SwapMemoryUsageResponse, DiskResponseList, \
    DiskPartitionResponse, DiskUsageResponse, DiskIoCountersResponse, NetworkIoCountersResponse, NetworkResponseList, \
    NetworkConnectionResponse, NetworkAddressResponse, NetworkInterfaceStatsResponse, SensorTemperatureResponse, \
    SensorResponseList, SensorFanResponse, SensorBatteryResponse, ConnectedUserResponseList, ConnectUserResponse, \
    ProcessResponseList, ProcessResponse

from platypush.plugins import Plugin, action

[docs]class SystemPlugin(Plugin): """ Plugin to get system info. Requires: - **py-cpuinfo** (``pip install py-cpuinfo``) for CPU model and info. - **psutil** (``pip install psutil``) for CPU load and stats. """
[docs] @action def cpu_info(self) -> CpuInfoResponse: """ Get CPU info. :return: :class:`platypush.message.response.system.CpuInfoResponse` """ from cpuinfo import get_cpu_info info = get_cpu_info() return CpuInfoResponse( arch=info.get('raw_arch_string'), bits=info.get('bits'), count=info.get('count'), vendor_id=info.get('vendor_id'), brand=info.get('brand'), hz_advertised=info.get('hz_advertised_raw')[0], hz_actual=info.get('hz_actual_raw')[0], stepping=info.get('stepping'), model=info.get('model'), family=info.get('family'), flags=info.get('flags'), l1_instruction_cache_size=info.get('l1_instruction_cache_size'), l1_data_cache_size=info.get('l1_data_cache_size'), l2_cache_size=info.get('l2_cache_size'), l3_cache_size=info.get('l3_cache_size'), )
[docs] @action def cpu_times(self, per_cpu=False, percent=False) -> Union[CpuTimesResponse, CpuResponseList]: """ Get the CPU times stats. :param per_cpu: Get per-CPU stats (default: False). :param percent: Get the stats in percentage (default: False). :return: :class:`platypush.message.response.system.CpuTimesResponse` """ import psutil times = psutil.cpu_times_percent(percpu=per_cpu) if percent else \ psutil.cpu_times(percpu=per_cpu) if per_cpu: return CpuResponseList([ CpuTimesResponse( user=t.user, nice=t.nice, system=t.system, idle=t.idle, iowait=t.iowait, irq=t.irq, softirq=t.softirq, steal=t.steal, guest=t.guest, guest_nice=t.guest_nice, ) for t in times ]) return CpuTimesResponse( user=times.user, nice=times.nice, system=times.system, idle=times.idle, iowait=times.iowait, irq=times.irq, softirq=times.softirq, steal=times.steal, guest=times.guest, guest_nice=times.guest_nice, )
[docs] @action def cpu_percent(self, per_cpu: bool = False, interval: Optional[float] = None) -> Union[float, List[float]]: """ Get the CPU load percentage. :param per_cpu: Get per-CPU stats (default: False). :param interval: When *interval* is 0.0 or None compares system CPU times elapsed since last call or module import, returning immediately (non blocking). That means the first time this is called it will return a meaningless 0.0 value which you should ignore. In this case is recommended for accuracy that this function be called with at least 0.1 seconds between calls. :return: float if ``per_cpu=False``, ``list[float]`` otherwise. """ import psutil percent = psutil.cpu_percent(percpu=per_cpu, interval=interval) if per_cpu: return [p for p in percent] return percent
[docs] @action def cpu_stats(self) -> CpuStatsResponse: """ Get CPU stats. :return: :class:`platypush.message.response.system.CpuStatsResponse` """ import psutil stats = psutil.cpu_stats() return CpuStatsResponse( ctx_switches=stats.ctx_switches, interrupts=stats.interrupts, soft_interrupts=stats.soft_interrupts, syscalls=stats.syscalls, )
[docs] @action def cpu_frequency(self, per_cpu: bool = False) -> Union[CpuFrequencyResponse, CpuResponseList]: """ Get CPU stats. :param per_cpu: Get per-CPU stats (default: False). :return: :class:`platypush.message.response.system.CpuFrequencyResponse` """ import psutil freq = psutil.cpu_freq(percpu=per_cpu) if per_cpu: return CpuResponseList([ CpuFrequencyResponse( min=f.min, max=f.max, current=f.current, ) for f in freq ]) return CpuFrequencyResponse( min=freq.min, max=freq.max, current=freq.current, )
[docs] @action def load_avg(self) -> List[float]: """ Get the average load as a vector that represents the load within the last 1, 5 and 15 minutes. """ import psutil return psutil.getloadavg()
[docs] @action def mem_virtual(self) -> VirtualMemoryUsageResponse: """ Get the current virtual memory usage stats. :return: list of :class:`platypush.message.response.system.VirtualMemoryUsageResponse` """ import psutil mem = psutil.virtual_memory() return VirtualMemoryUsageResponse(, available=mem.available, percent=mem.percent, used=mem.used,,, inactive=mem.inactive, buffers=mem.buffers, cached=mem.cached, shared=mem.shared, )
[docs] @action def mem_swap(self) -> SwapMemoryUsageResponse: """ Get the current virtual memory usage stats. :return: list of :class:`platypush.message.response.system.SwapMemoryUsageResponse` """ import psutil mem = psutil.swap_memory() return SwapMemoryUsageResponse(, percent=mem.percent, used=mem.used,, sin=mem.sin, sout=mem.sout, )
[docs] @action def disk_partitions(self) -> DiskResponseList: """ Get the list of partitions mounted on the system. :return: list of :class:`platypush.message.response.system.DiskPartitionResponse` """ import psutil parts = psutil.disk_partitions() return DiskResponseList([ DiskPartitionResponse( device=p.device, mount_point=p.mountpoint, fstype=p.fstype, opts=p.opts, ) for p in parts ])
[docs] @action def disk_usage(self, path: Optional[str] = None) -> Union[DiskUsageResponse, DiskResponseList]: """ Get the usage of a mounted disk. :param path: Path where the device is mounted (default: get stats for all mounted devices). :return: :class:`platypush.message.response.system.DiskUsageResponse` or list of :class:`platypush.message.response.system.DiskUsageResponse`. """ import psutil if path: usage = psutil.disk_usage(path) return DiskUsageResponse( path=path,, used=usage.used,, percent=usage.percent, ) else: disks = {p.mountpoint: psutil.disk_usage(p.mountpoint) for p in psutil.disk_partitions()} return DiskResponseList([ DiskUsageResponse( path=path,, used=disk.used,, percent=disk.percent, ) for path, disk in disks.items() ])
[docs] @action def disk_io_counters(self, disk: Optional[str] = None, per_disk: bool = False) -> \ Union[DiskIoCountersResponse, DiskResponseList]: """ Get the I/O counter stats for the mounted disks. :param disk: Select the stats for a specific disk (e.g. 'sda1'). Default: get stats for all mounted disks. :param per_disk: Return the stats per disk (default: False). :return: :class:`platypush.message.response.system.DiskIoCountersResponse` or list of :class:`platypush.message.response.system.DiskIoCountersResponse`. """ import psutil def _expand_response(_disk, _stats): return DiskIoCountersResponse( read_count=_stats.read_count, write_count=_stats.write_count, read_bytes=_stats.read_bytes, write_bytes=_stats.write_bytes, read_time=_stats.read_time, write_time=_stats.write_time, read_merged_count=_stats.read_merged_count, write_merged_count=_stats.write_merged_count, busy_time=_stats.busy_time, disk=_disk, ) if disk: per_disk = True io = psutil.disk_io_counters(perdisk=per_disk) if disk: stats = [d for name, d in io.items() if name == disk] assert stats, 'No such disk: {}'.format(disk) return _expand_response(disk, stats[0]) if not per_disk: return _expand_response(None, io) return DiskResponseList([ _expand_response(disk, stats) for disk, stats in io.items() ])
[docs] @action def net_io_counters(self, nic: Optional[str] = None, per_nic: bool = False) -> \ Union[NetworkIoCountersResponse, NetworkResponseList]: """ Get the I/O counters stats for the network interfaces. :param nic: Select the stats for a specific network device (e.g. 'eth0'). Default: get stats for all NICs. :param per_nic: Return the stats broken down per interface (default: False). :return: :class:`platypush.message.response.system.NetIoCountersResponse` or list of :class:`platypush.message.response.system.NetIoCountersResponse`. """ import psutil def _expand_response(_nic, _stats): return NetworkIoCountersResponse( bytes_sent=_stats.bytes_sent, bytes_recv=_stats.bytes_recv, packets_sent=_stats.packets_sent, packets_recv=_stats.packets_recv, errin=_stats.errin, errout=_stats.errout, dropin=_stats.dropin, dropout=_stats.dropout, nic=_nic, ) if nic: per_nic = True io = psutil.net_io_counters(pernic=per_nic) if nic: stats = [d for name, d in io.items() if name == nic] assert stats, 'No such network interface: {}'.format(nic) return _expand_response(nic, stats[0]) if not per_nic: return _expand_response(nic, io) return NetworkResponseList([ _expand_response(nic, stats) for nic, stats in io.items() ])
# noinspection PyShadowingBuiltins
[docs] @action def net_connections(self, type: Optional[str] = None) -> Union[NetworkConnectionResponse, NetworkResponseList]: """ Get the list of active network connections. On macOS this function requires root privileges. :param type: Connection type to filter. Supported types: +------------+----------------------------------------------------+ | Kind Value | Connections using | +------------+----------------------------------------------------+ | inet | IPv4 and IPv6 | | inet4 | IPv4 | | inet6 | IPv6 | | tcp | TCP | | tcp4 | TCP over IPv4 | | tcp6 | TCP over IPv6 | | udp | UDP | | udp4 | UDP over IPv4 | | udp6 | UDP over IPv6 | | unix | UNIX socket (both UDP and TCP protocols) | | all | the sum of all the possible families and protocols | +------------+----------------------------------------------------+ :return: List of :class:`platypush.message.response.system.NetworkConnectionResponse`. """ import psutil conns = psutil.net_connections(kind=type) return NetworkResponseList([ NetworkConnectionResponse( fd=conn.fd,,, local_address=conn.laddr[0] if conn.laddr else None, local_port=conn.laddr[1] if len(conn.laddr) > 1 else None, remote_address=conn.raddr[0] if conn.raddr else None, remote_port=conn.raddr[1] if len(conn.raddr) > 1 else None, status=conn.status,, ) for conn in conns ])
[docs] @action def net_addresses(self, nic: Optional[str] = None) -> Union[NetworkAddressResponse, NetworkResponseList]: """ Get address info associated to the network interfaces. :param nic: Select the stats for a specific network device (e.g. 'eth0'). Default: get stats for all NICs. :return: :class:`platypush.message.response.system.NetworkAddressResponse` or list of :class:`platypush.message.response.system.NetworkAddressResponse`. """ import psutil addrs = psutil.net_if_addrs() def _expand_addresses(_nic, _addrs): args = {'nic': _nic} for addr in _addrs: if == socket.AddressFamily.AF_INET: args.update({ 'ipv4_address': addr.address, 'ipv4_netmask': addr.netmask, 'ipv4_broadcast': addr.broadcast, }) elif == socket.AddressFamily.AF_INET6: args.update({ 'ipv6_address': addr.address, 'ipv6_netmask': addr.netmask, 'ipv6_broadcast': addr.broadcast, }) elif == socket.AddressFamily.AF_PACKET: args.update({ 'mac_address': addr.address, 'mac_broadcast': addr.broadcast, }) if addr.ptp and not args.get('ptp'): args['ptp'] = addr.ptp return NetworkAddressResponse(**args) if nic: addrs = [addr for name, addr in addrs.items() if name == nic] assert addrs, 'No such network interface: {}'.format(nic) addr = addrs[0] return _expand_addresses(nic, addr) return NetworkResponseList([ _expand_addresses(nic, addr) for nic, addr in addrs.items() ])
[docs] @action def net_stats(self, nic: Optional[str] = None) -> Union[NetworkInterfaceStatsResponse, NetworkResponseList]: """ Get stats about the network interfaces. :param nic: Select the stats for a specific network device (e.g. 'eth0'). Default: get stats for all NICs. :return: :class:`platypush.message.response.system.NetworkInterfaceStatsResponse` or list of :class:`platypush.message.response.system.NetworkInterfaceStatsResponse`. """ import psutil stats = psutil.net_if_stats() def _expand_stats(_nic, _stats): return NetworkInterfaceStatsResponse( nic=_nic, is_up=_stats.isup,, speed=_stats.speed, mtu=_stats.mtu, ) if nic: stats = [addr for name, addr in stats.items() if name == nic] assert stats, 'No such network interface: {}'.format(nic) return _expand_stats(nic, stats[0]) return NetworkResponseList([ _expand_stats(nic, addr) for nic, addr in stats.items() ])
# noinspection DuplicatedCode
[docs] @action def sensors_temperature(self, sensor: Optional[str] = None, fahrenheit: bool = False) \ -> Union[SensorTemperatureResponse, List[SensorTemperatureResponse], Dict[str, Union[SensorTemperatureResponse, List[SensorTemperatureResponse]]]]: """ Get stats from the temperature sensors. :param sensor: Select the sensor name. :param fahrenheit: Return the temperature in Fahrenheit (default: Celsius). """ import psutil stats = psutil.sensors_temperatures(fahrenheit=fahrenheit) if sensor: stats = [addr for name, addr in stats.items() if name == sensor] assert stats, 'No such sensor name: {}'.format(sensor) if len(stats) == 1: return SensorTemperatureResponse( name=sensor, current=stats[0].current, high=stats[0].high, critical=stats[0].critical, label=stats[0].label, ) return [ SensorTemperatureResponse( name=sensor, current=s.current, high=s.high, critical=s.critical, label=s.label, ) for s in stats ] ret = {} for name, data in stats.items(): for stat in data: resp = SensorTemperatureResponse( name=sensor, current=stat.current, high=stat.high, critical=stat.critical, label=stat.label, ).output if name not in ret: ret[name] = resp else: if isinstance(ret[name], list): ret[name].append(resp) else: ret[name] = [ret[name], resp] return ret
# noinspection DuplicatedCode
[docs] @action def sensors_fan(self, sensor: Optional[str] = None) -> SensorResponseList: """ Get stats from the fan sensors. :param sensor: Select the sensor name. :return: List of :class:`platypush.message.response.system.SensorFanResponse`. """ import psutil stats = psutil.sensors_fans() def _expand_stats(name, _stats): return SensorResponseList([ SensorFanResponse( name=name, current=s.current, label=s.label, ) for s in _stats ]) if sensor: stats = [addr for name, addr in stats.items() if name == sensor] assert stats, 'No such sensor name: {}'.format(sensor) return _expand_stats(sensor, stats[0]) return SensorResponseList([ _expand_stats(name, stat) for name, stat in stats.items() ])
[docs] @action def sensors_battery(self) -> SensorBatteryResponse: """ Get stats from the battery sensor. :return: List of :class:`platypush.message.response.system.SensorFanResponse`. """ import psutil stats = psutil.sensors_battery() return SensorBatteryResponse( percent=stats.percent, secs_left=stats.secsleft, power_plugged=stats.power_plugged, )
[docs] @action def connected_users(self) -> ConnectedUserResponseList: """ Get the list of connected users. :return: List of :class:`platypush.message.response.system.ConnectUserResponse`. """ import psutil users = psutil.users() return ConnectedUserResponseList([ ConnectUserResponse(, terminal=u.terminal,, started=datetime.fromtimestamp(u.started),, ) for u in users ])
# noinspection PyShadowingBuiltins
[docs] @action def processes(self, filter: Optional[str] = '') -> ProcessResponseList: """ Get the list of running processes. :param filter: Filter the list by name. :return: List of :class:`platypush.message.response.system.ProcessResponse`. """ import psutil processes = [psutil.Process(pid) for pid in psutil.pids()] p_list = [] for p in processes: if filter and filter not in continue args = {} try: mem = p.memory_info() times = p.cpu_times() args.update(,, started=datetime.fromtimestamp(p.create_time()), ppid=p.ppid(), children=[ for pp in p.children()], status=p.status(), username=p.username(), terminal=p.terminal(), cpu_user_time=times.user, cpu_system_time=times.system, cpu_children_user_time=times.children_user, cpu_children_system_time=times.children_system, mem_rss=mem.rss, mem_vms=mem.vms, mem_shared=mem.shared, mem_text=mem.text,, mem_lib=mem.lib, mem_dirty=mem.dirty, mem_percent=p.memory_percent(), ) except psutil.Error: continue try: args.update( exe=p.exe(), ) except psutil.Error: pass p_list.append(ProcessResponse(**args)) return ProcessResponseList(p_list)
@staticmethod def _get_process(pid: int): import psutil return psutil.Process(pid)
[docs] @action def pid_exists(self, pid: int) -> bool: """ :param pid: Process PID. :return: ``True`` if the process exists, ``False`` otherwise. """ import psutil return psutil.pid_exists(pid)
[docs] @action def suspend(self, pid: int): """ Suspend a process. :param pid: Process PID. """ self._get_process(pid).suspend()
[docs] @action def resume(self, pid: int): """ Resume a process. :param pid: Process PID. """ self._get_process(pid).resume()
[docs] @action def terminate(self, pid: int): """ Terminate a process. :param pid: Process PID. """ self._get_process(pid).terminate()
[docs] @action def kill(self, pid: int): """ Kill a process. :param pid: Process PID. """ self._get_process(pid).kill()
[docs] @action def wait(self, pid: int, timeout: int = None): """ Wait for a process to terminate. :param pid: Process PID. :param timeout: Timeout in seconds (default: ``None``). """ self._get_process(pid).wait(timeout)
# vim:sw=4:ts=4:et: