Module jumpscale.sals.nettools
This module contains a collection of functions which help in manage network connections and interfaces.
General Note on python socket operations: If you use a hostname in the host portion of IPv4/v6 socket address,the program may show a nondeterministic behavior, as Python uses the first address returned from the DNS resolution. The socket address will be resolved differently into an actual IPv4/v6 address, depending on the results from DNS resolution and/or the host configuration. For deterministic behavior use a numeric address in host portion. https://docs.python.org/3/library/socket.html
Expand source code
"""This module contains a collection of functions which help in manage network connections and interfaces.
General Note on python socket operations:
If you use a hostname in the host portion of IPv4/v6 socket address,the program may show a nondeterministic behavior,
as Python uses the first address returned from the DNS resolution. The socket address will be resolved differently
into an actual IPv4/v6 address, depending on the results from DNS resolution and/or the host configuration.
For deterministic behavior use a numeric address in host portion.
https://docs.python.org/3/library/socket.html
"""
import time
import socket
import ipaddress
import re
import ssl
import json
import subprocess
import shutil
from os.path import basename
from typing import Optional
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from urllib.parse import urlparse
from urllib.request import build_opener, HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, install_opener
from pathlib import Path
from collections import namedtuple
from jumpscale.loader import j
from jumpscale.core.exceptions import Value, Runtime
import jumpscale.tools.http
import jumpscale.data.platform
import jumpscale.sals.fs
import jumpscale.core.executors
def tcp_connection_test(ipaddr: str, port: int, timeout: Optional[int] = None) -> bool:
"""tests tcp connection on specified port, compatible with both IPv4 and IPv6.
ensures that each side of the connection is reachable in the network.
Raises:
socket.gaierror: raised for address-related errors.
socket.herror: raised for address-related errors.
Args:
ipaddr (str): ip address or hostname
port (int): port number
timeout (int, optional): time before the connection test fails. Defaults to None.
Returns:
bool: True if the test succeeds, False otherwise
"""
try:
with socket.create_connection((ipaddr, port), timeout) as conn:
return True
except (ConnectionRefusedError, socket.timeout, OSError):
return False
def udp_connection_test(ipaddr: str, port: int, timeout: Optional[int] = 1, message: Optional[bytes] = b"") -> bool:
"""tests udp connection on specified port by sending specified message and expecting
to receive at least one byte from the socket as an indicator of connection success
Args:
ipaddr (str): ip address
port (int): port number
timeout (int, optional): time before the connection test fails. Defaults to None.
message (str, optional): message to send. Defaults to b"PING"
Raises:
ValueError: raises if invalid ip address was used
Returns:
bool: True if the test succeeds, False otherwise
"""
ip = ipaddress.ip_address(ipaddr)
if ip.version == 4:
family = socket.AF_INET
else:
family = socket.AF_INET6
with socket.socket(family, socket.SOCK_DGRAM) as sock:
if timeout:
sock.settimeout(timeout)
try:
sock.sendto(message, (ipaddr, port))
# expecting to receive at least one byte from the socket as indication to succeed connection
data, _ = sock.recvfrom(1)
return True
except (socket.timeout, OSError):
return False
def wait_connection_test(ipaddr: str, port: int, timeout: Optional[int] = 6) -> bool:
"""Will wait until port listens on the specified address or `timeout` sec elapsed
under the hood the function will try to connect every `interval` sec, if waiting time `timeout` set
to value <= 2, `interval` is 1 sec, otherwise 2.
Args:
ipaddr (str): ip address, or hostname
port (int): port number
timeout_total (int, optional): how long to wait for the connection. if the timeout set to value > 2,
due to the way the function works, it makes sense to choose an even number. Defaults to 6.
Returns:
bool: True if the test succeeds, False otherwise
"""
# port = int(port)
interval = 1 if timeout <= 2 else 2
init_start = time.time()
deadline = init_start + timeout
while time.time() < deadline:
start = time.time()
if tcp_connection_test(ipaddr, port, timeout=interval):
return True
# if return immediately (err111) take a break before retry
if time.time() - start < interval:
time.sleep(1)
return False
def wait_http_test(
url: str, timeout: Optional[int] = 60, verify: Optional[bool] = True, interval_time: Optional[int] = 2
) -> bool:
"""Will keep try to reach specified url every {interval_time} sec until url become reachable or {timeout} sec elapsed
Args:
url (str): url
timeout (int, optional): how long to keep trying to reach specified url. Defaults to 60.
verify (bool, optional): boolean indication to verify the servers TLS certificate or not.
interval_time (int, optional): how long to wait for a response before sending a new request. Defaults to 2.
Raises:
ValueError: raises if not correct url
Returns:
bool: true if the test succeeds
"""
init_start = time.time()
deadline = init_start + timeout
while time.time() < deadline:
start = time.time()
if check_url_reachable(url, interval_time, verify):
return True
# be gentle on system resource in case the above call to check_url_reachable() returned immediately (edge cases)
if time.time() - start < interval_time:
time.sleep(1)
return False
def check_url_reachable(
url: str, timeout: Optional[int] = 5, verify: Optional[bool] = True, fake_user_agent: Optional[bool] = True
) -> bool:
"""Check that given url is reachable
Args:
url (str): url to test
timeout (int, optional): timeout of test. Defaults to 5.
verify (bool, optional): boolean indication to verify the servers TLS certificate or not.
fake_user_agent (bool, optional): boolean indication to fake the user-agent and act like normal browser or not.
Raises:
ValueError: raises if not correct url
Returns:
bool: True if the test succeeds, False otherwise
"""
# fake the user-agent, to act like a normal browser
# because some services will block requests from python default user-agent
# By default urllib identifies itself as Python-urllib/x.y (e.g. Python-urllib/2.5),
# which may confuse the site, or just plain not work.
# ex: www.amazon.com, and it will looks unreachable to our code, unless we fake the user-agent.
HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.1 Safari/603.1.30"
}
METHOD = "GET"
if timeout:
socket.setdefaulttimeout(timeout)
context = None
if not verify:
# opt out of certificate verification on a single connection
context = ssl._create_unverified_context()
req = Request(url, headers=HEADERS if fake_user_agent else {}, method=METHOD)
try:
with urlopen(req, timeout=timeout, context=context):
return True
except (HTTPError, URLError, socket.timeout):
return False
def get_nic_names() -> list:
"""Get Nics on this machine
Returns:
list: list of all availabe nics
"""
return [nic["name"] for nic in get_network_info()]
def get_nic_type(interface: str) -> str:
"""Get Nic Type on a certain interface
Args:
interface (str): interface name
Raises:
Runtime: if ethtool not installed on the system
Value: if interface given is invalid
Returns:
str: type of the interface
"""
output = ""
if jumpscale.data.platform.is_linux():
if jumpscale.sals.fs.exists(f"/sys/class/net/{interface}"):
output = jumpscale.sals.fs.read_file(f"/sys/class/net/{interface}/type")
if output.strip() == "32":
return "INFINIBAND"
else:
if jumpscale.sals.fs.exists("/proc/net/vlan/%s" % (interface)):
return "VLAN"
exitcode, _, _ = jumpscale.core.executors.run_local("which ethtool", hide=True, warn=True)
if exitcode != 0:
raise Runtime("Ethtool is not installed on this system!")
exitcode, output, _ = jumpscale.core.executors.run_local(f"ethtool -i {interface}", hide=True, warn=True)
if exitcode != 0:
return "VIRTUAL"
match = re.search(r"^driver:\s+(?P<driver>\w+)\s*$", output, re.MULTILINE)
if match and match.group("driver") == "tun":
return "VIRTUAL"
if match and match.group("driver") == "bridge":
return "VLAN"
return "ETHERNET_GB"
elif jumpscale.data.platform.is_osx():
command = f"ifconfig {interface}"
exitcode, output, err = jumpscale.core.executors.run_local(command, hide=True, warn=True)
if exitcode != 0:
# temporary plumb the interface to lookup its mac
jumpscale.core.executors.run_local(f"{command} plumb", hide=True)
exitcode, output, err = jumpscale.core.executors.run_local(command, hide=True)
jumpscale.core.executors.run_local(f"{command} unplumb", hide=True)
if output.find("ipib") >= 0:
return "INFINIBAND"
else:
# work with interfaces which are subnetted on vlans eq e1000g5000:1
interfacepieces = interface.split(":")
interface = interfacepieces[0]
match = re.search(r"^\w+?(?P<interfaceid>\d+)$", interface, re.MULTILINE)
if not match:
raise Value(f"Invalid interface {interface}")
if len(match.group("interfaceid")) >= 4:
return "VLAN"
else:
if len(interfacepieces) > 1:
return "VIRTUAL"
else:
return "ETHERNET_GB"
def get_reachable_ip_address(ip: str, port: Optional[int] = 0) -> str:
"""figures out what source address would be used if some traffic were to be sent out to specified ip.
compatible with both IPv4 and IPv6.
Args:
ip (str): ip address
port (int, optional): port number. does not matter much. No traffic is actually sent. Defaults to 0.
Raises:
ValueError: if address does not represent a valid IPv4 or IPv6 address, or port is invalid.
RuntimeError: if can't connect
Returns:
str: ip that can connect to the specified ip
"""
ipaddr = ipaddress.ip_address(ip)
if ipaddr.version == 4:
family = socket.AF_INET
else:
family = socket.AF_INET6
with socket.socket(family, socket.SOCK_DGRAM) as sock:
try:
sock.connect((ip, port))
except OSError as e:
# (ConnectionRefusedError, socket.timeout, socket.herror, socket.gaierror)
reason = e.error if hasattr(e, "error") else repr(e)
raise RuntimeError(f"Cannot connect to {ip}:{port} because of this error: {reason}")
except (ValueError, TypeError, OverflowError) as e:
# incorrect port numper or type
raise ValueError(repr(e))
return sock.getsockname()[0]
def get_default_ip_config(ip: Optional[str] = "8.8.8.8") -> tuple:
"""get default nic and address, by default, the one exposed to internet
Args:
ip (str): ip address. default to '8.8.8.8'
Raises:
ValueError: if address does not represent a valid IPv4 or IPv6 address.
RuntimeError: if can't connect
Returns:
tuple: default nic name and its ip address
"""
ipaddr = ipaddress.ip_address(ip)
address_family = "ip" if ipaddr.version == 4 else "ip6"
source_addr = get_reachable_ip_address(ip)
default_nic = None
for nic in get_network_info():
for candidate_ip, _ in nic[address_family]:
if candidate_ip == source_addr:
default_nic = (nic["name"], source_addr)
break
if default_nic is not None:
break
return default_nic
def get_network_info(device: Optional[str] = None) -> list:
"""Get network info
Args:
device (str, optional): device name. Defaults to None.
Raises:
Runtime: if it could not find the specified device
NotImplementedError: if the function runs on unsupported OS
Returns:
Dict, or list of dicts if device arg used: network info
[{'ip': [('127.0.0.1', 8)], 'ip6': [('::1', 128)], 'mac': '00:00:00:00:00:00', 'name': 'lo'},
{'ip': [('192.168.1.6', 24)],
'ip6': [('fdb4:f58e:3c34:300:91f0:9c76:e1fb:d060', 64), ...],
'mac': 'd8:9c:67:2a:f2:53',
'name': 'wlp3s0'}, ...]
"""
def _clean(nic_info: dict):
result = {
"ip": [(addr["local"], addr["prefixlen"]) for addr in nic_info["addr_info"] if addr["family"] == "inet"],
"ip6": [(addr["local"], addr["prefixlen"]) for addr in nic_info["addr_info"] if addr["family"] == "inet6"],
"mac": nic_info["address"],
"name": nic_info["ifname"],
}
return result
def _get_info():
if device:
# exitcode, output, _ = jumpscale.core.executors.run_local(f"ip -j addr show {device}", hide=True, warn=True)
# if exitcode != 0:
# raise Runtime("could not find device")
try:
output = subprocess.check_output(f"ip -j addr show {device}", shell=True)
except subprocess.CalledProcessError as e:
# the process returns a non-zero exit status.
# This probably happened because specified interface name does not exists
j.logger.exception(f"cmd: {e.cmd} returns a non-zero exit status.", exception=e)
raise RuntimeError(f"could not find this interface: {device}")
else:
# _, output, _ = jumpscale.core.executors.run_local("ip -j addr show", hide=True, warn=True)
output = subprocess.check_output("ip -j addr show", shell=True)
res = json.loads(output)
for nic_info in res:
# when use ip command with -j option and specified interface. it returns on ubuntu < 20
# a list contains a requested info alongside other partially empty dicts like this -> {'addr_info': [{}, {}]}
# so we need to filter those dicts to get consistent behavior at all supported ubuntu versions.
if len(nic_info) > 1:
yield _clean(nic_info)
else:
j.logger.debug(f"Discarded this improper json\n{nic_info}")
continue
if jumpscale.data.platform.is_linux():
if not device:
res = []
for nic in _get_info():
res.append(nic)
return res
else:
return next(_get_info())
else:
# TODO: make it OSX Compatible
raise NotImplementedError("this function supports only linux at the moment.")
def get_mac_address(interface: str) -> str:
"""Return the MAC address of this interface
Args:
interface (str): interface name
Returns:
str: mac of the interface
"""
return get_network_info(interface)["mac"]
def get_host_name() -> str: # pragma: no cover - we're just proxying
"""Get hostname of the machine
Returns:
str: host name
"""
return socket.gethostname()
def is_nic_connected(interface: str) -> bool:
"""check if interface is connected
Args:
interface (str): interface name
Returns:
bool: whether it is connected or not
"""
if jumpscale.data.platform.is_linux():
carrierfile = f"/sys/class/net/{interface}/carrier"
try:
is_up = int(jumpscale.sals.fs.read_file(carrierfile)) != 0
return is_up
except IOError as e:
j.logger.exception(e.strerror or repr(e), exception=e)
return False
elif jumpscale.data.platform.is_osx():
# superuser.com/questions/203272/
command = "ifconfig -{} | sed -E 's/[[:space:]:].*//;/^$/d"
option = {"up": "u", "down": "d"}
stdout = subprocess.check_output(command.format(option["up"]), shell=True)
output = stdout.decode("utf-8")
up_interfaces = output.split()
return interface in up_interfaces
def get_host_by_name(dnsHostname: str) -> str: # pragma: no cover - we're just proxying
"""get host address by its name
Args:
dnsHostname (str): host name
Returns:
str: host address
"""
return socket.gethostbyname(dnsHostname)
def ping_machine(ip: str, timeout: Optional[int] = 60, allowhostname: Optional[bool] = True) -> bool:
"""Ping a machine to check if it's up/running and accessible
Note: Any well-behaved device on an LAN or WAN is free to ignore nearly any traffic,
so PINGs, port scans, and the like are all unreliable.
Args:
ip (str): Machine Ip Address
pingtimeout (int, optional): time in sec after which ip will be declared as unreachable. Defaults to 60.
allowhostname (bool, optional): allow pinging on hostname. Defaults to True.
Raises:
ValueError: if ip is Invalid ip address
NotImplementedError: if the function runs on unsupported system
Returns:
bool: True if machine is pingable, False otherwise
"""
if not allowhostname:
ipaddress.ip_address(ip)
if jumpscale.data.platform.is_linux():
try:
_ = subprocess.check_output(f"ping -c 1 -w {timeout} {ip}", shell=True)
exitcode = 0
except subprocess.CalledProcessError as e:
exitcode = e.returncode
# exitcode, output, err = jumpscale.core.executors.run_local(f"ping -c 1 -w {timeout} {ip}", warn=True, hide=True)
elif jumpscale.data.platform.is_osx():
try:
_ = subprocess.check_output(f"ping -o -t {timeout} {ip}", shell=True)
exitcode = 0
except subprocess.CalledProcessError as e:
exitcode = e.returncode
# exitcode, _, _ = jumpscale.core.executors.run_local(f"ping -o -t {timeout} {ip}", warn=True, hide=True)
else: # unsupported platform
raise NotImplementedError("Not Implemented for this os")
return exitcode == 0
def download(
url: str,
localpath: Optional[str] = "",
username: Optional[str] = None,
passwd: Optional[str] = None,
overwrite: Optional[bool] = True,
append_to_home: Optional[bool] = False,
name_from_url: Optional[bool] = True,
):
"""Download a url to a file or a directory, supported protocols: http, https, ftp, file
Args:
url (str): URL to download from
localpath (str): filename or directory to download the url to. pass None to return the data. Defaults to ''.
username (str, optional): username for the url if it requires authentication. Defaults to None.
passwd (str, optional): password for the url if it requires authentication. Defaults to None.
overwrite (bool, optional): if the file exists, it will be truncated. Defaults to True.
append_to_home (bool, optional): if set to true, any relative path specified in localpath arg
will be appended to the user home directory (that guaranteed to have a write permission). if set
to False any relative localpath set by user will append to the current working directory. if user
specified a absolute localpath (start with /) append_to_home will have no effect. Defaults to False.
name_from_url (bool, optional): if set to true, localpath will treated as a dir, and will try to get
the file name from the url or fallback to auto generated name. Defaults to True.
Raises:
PermissionError: [description]
FileNotFoundError: [description]
FileExistsError: [description]
ValueError: [description]
URLError: [description]
Returns:
namedtuple: namedtuple('DownloadResult', ['localpath', 'content', content_length])
- localpath (pathlib.Path)
- content (bytes): only if localpath is None else it will be None always
- content_length: the content length as returned from the response headers
Todo:
- better performance with Multi-Threaded
- support resume
Examples:
# use default values for args will download the url to cwd and get the name from the url,
# if the file already exists, it will overwritten.
>>> nettools.download('https://www.7-zip.org/a/7z1900-extra.7z')
DownloadResult(localpath=PosixPath('/home/sameh/projects/js-ng/7z1900-extra.7z'), content=None, content_length='929117')
"""
DownloadResult = namedtuple("DownloadResult", ["localpath", "content", "content_length"])
file_name = ""
try:
parsed_url = urlparse(url)
except ValueError as e:
j.logger.exception(repr(e), exception=e)
raise
if name_from_url:
file_name = basename(parsed_url.path) # TODO: insure safe file name and fall back if can't get the file name
elif localpath == "":
j.logger.error(f"Improper args used.\nname_from_url: {name_from_url}\nlocalpath: {localpath}")
raise ValueError("localpath can't be empty when name_from_url is False")
if username and passwd:
if parsed_url.scheme == "ftp":
url = "ftp://%s:%s@" % (username, passwd) + url.split("://")[1]
elif parsed_url.scheme in ("http", "https"):
# create a password manager
password_mgr = HTTPPasswordMgrWithDefaultRealm()
# Add the username and password.
# If we knew the realm, we could use it instead of None.
password_mgr.add_password(None, parsed_url.netloc, username, passwd)
handler = HTTPBasicAuthHandler(password_mgr)
# create "opener" (OpenerDirector instance)
opener = build_opener(handler)
install_opener(opener)
req = Request(url)
req.add_header(
"User-agent",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36",
)
try:
response = urlopen(req)
content_length = response.headers.get("Content-Length")
except URLError as e:
if hasattr(e, "reason"):
msg = "We failed to reach a server."
msg += " Reason: " + e.reason
elif hasattr(e, "code"):
msg = "The server couldn't fulfill the request."
msg += " Error code: " + e.code
j.logger.exception(msg, exception=e)
raise
if localpath is None:
return DownloadResult(localpath=None, content=response.read(), content_length=content_length)
file_path = Path(localpath) / file_name
if not file_path.is_absolute() and append_to_home:
file_path = Path.home() / file_path
dir_path = file_path.parent
try:
dir_path.mkdir(parents=True, exist_ok=True)
except (PermissionError, FileNotFoundError) as e:
j.logger.exception(e.strerror or repr(e), exception=e)
raise
if overwrite:
file_mode = "wb" # if exists will truncated
else:
file_mode = "xb" # if exists will raise exception
try:
with file_path.open(file_mode) as f_handler:
shutil.copyfileobj(response, f_handler, length=8 * 1024 * 1024)
return DownloadResult(localpath=file_path.resolve(), content=None, content_length=content_length)
finally:
response.close()
def _netobject_get(device: str) -> ipaddress.IPv4Network:
n = get_network_info(device)
net = ipaddress.IPv4Network(n["ip"][0][0] + "/" + str(n["ip"][0][1]), sdirnametrict=False)
return net
def netrange_get(device: str, skip_begin: Optional[int] = 10, skip_end: Optional[int] = 10) -> tuple:
"""Get ($fromip,$topip) from range attached to device, skip the mentioned ip addresses.
Args:
device (str): [description]
skip_begin (Optional[int], optional): ips to skip from the begining of the range, Defaults to 10.
skip_end (Optional[int], optional): ips to skip from the end of the range, Defaults to 10.
Returns:
tuple: ip range for this device
"""
n = _netobject_get(device)
return (str(n[0] + skip_begin), str(n[-1] - skip_end))
def get_free_port(ipv6: Optional[bool] = False, udp: Optional[bool] = False, return_socket: Optional[bool] = False):
"""Bind an ipv4 or ipv6 socket to port 0 to make OS pick a random, free and
available port from 1024 to 65535.
you can optionally choose to reuse the socket by set return_socket to True (preferred to
To prevent race conditions from occurring) but then it is your responsibility to close
that socket calling its close method after you finish with it to free the selected port.
by default you got tcp port, but setting udp to True will set socket type to UDP.
Args:
ipv6 (bool, optional): weather to bind the free port to 127.0.0.1 or ::1. Defaults to False.
udp (bool, optional): set socket type to udp instead of tcp. Defaults to False.
return_socket (bool, optional): return the socket alongside the port to reuse it. Defaults to False.
Returns:
int: returns a random free port from 1024 to 65535 range.
Optional[socket]: in the case of return_socket set to True, socket will be returned alongside the port in a tuple.
Example:
# get a free TCP port that currently not binded to 127.0.0.1
>>> port = get_free_port()
# get a free UDP port that currently not binded to 127.0.0.1
>>> port = get_free_port(udp=True)
# get a free TCP port that currently not binded to ::1
>>> port = get_free_port(ipv6=True)
# get a free TCP port that currently not binded to 127.0.0.1
# and reuse the socket instead of creating a socket and bind it to selected port
>>> port, sock = get_free_port(return_socket=True)
>>> sock.listen()
..
>>> sock.close()
"""
socket_type = socket.SOCK_DGRAM if udp else socket.SOCK_STREAM
# Picking a random port is not a good idea - let the OS pick one for you.
if ipv6:
sock = socket.socket(socket.AF_INET6, socket_type)
sock.bind(("::1", 0))
else:
sock = socket.socket(socket.AF_INET, socket_type)
sock.bind(("127.0.0.1", 0))
# retrieve the selected port with getsockname() right after bind()
port = sock.getsockname()[1]
# returns port or (port, socket) depend on the bool value of return_socket
return (port, sock) if return_socket else port
Functions
def check_url_reachable(url: str, timeout: Optional[int] = 5, verify: Optional[bool] = True, fake_user_agent: Optional[bool] = True) ‑> bool
-
Check that given url is reachable
Args
url
:str
- url to test
timeout
:int
, optional- timeout of test. Defaults to 5.
verify
:bool
, optional- boolean indication to verify the servers TLS certificate or not.
fake_user_agent
:bool
, optional- boolean indication to fake the user-agent and act like normal browser or not.
Raises
ValueError
- raises if not correct url
Returns
bool
- True if the test succeeds, False otherwise
Expand source code
def check_url_reachable( url: str, timeout: Optional[int] = 5, verify: Optional[bool] = True, fake_user_agent: Optional[bool] = True ) -> bool: """Check that given url is reachable Args: url (str): url to test timeout (int, optional): timeout of test. Defaults to 5. verify (bool, optional): boolean indication to verify the servers TLS certificate or not. fake_user_agent (bool, optional): boolean indication to fake the user-agent and act like normal browser or not. Raises: ValueError: raises if not correct url Returns: bool: True if the test succeeds, False otherwise """ # fake the user-agent, to act like a normal browser # because some services will block requests from python default user-agent # By default urllib identifies itself as Python-urllib/x.y (e.g. Python-urllib/2.5), # which may confuse the site, or just plain not work. # ex: www.amazon.com, and it will looks unreachable to our code, unless we fake the user-agent. HEADERS = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_4) AppleWebKit/603.1.30 (KHTML, like Gecko) Version/10.1 Safari/603.1.30" } METHOD = "GET" if timeout: socket.setdefaulttimeout(timeout) context = None if not verify: # opt out of certificate verification on a single connection context = ssl._create_unverified_context() req = Request(url, headers=HEADERS if fake_user_agent else {}, method=METHOD) try: with urlopen(req, timeout=timeout, context=context): return True except (HTTPError, URLError, socket.timeout): return False
def download(url: str, localpath: Optional[str] = '', username: Optional[str] = None, passwd: Optional[str] = None, overwrite: Optional[bool] = True, append_to_home: Optional[bool] = False, name_from_url: Optional[bool] = True)
-
Download a url to a file or a directory, supported protocols: http, https, ftp, file
Args
url
:str
- URL to download from
localpath
:str
- filename or directory to download the url to. pass None to return the data. Defaults to ''.
username
:str
, optional- username for the url if it requires authentication. Defaults to None.
passwd
:str
, optional- password for the url if it requires authentication. Defaults to None.
overwrite
:bool
, optional- if the file exists, it will be truncated. Defaults to True.
append_to_home
:bool
, optional- if set to true, any relative path specified in localpath arg will be appended to the user home directory (that guaranteed to have a write permission). if set to False any relative localpath set by user will append to the current working directory. if user specified a absolute localpath (start with /) append_to_home will have no effect. Defaults to False.
name_from_url
:bool
, optional- if set to true, localpath will treated as a dir, and will try to get the file name from the url or fallback to auto generated name. Defaults to True.
Raises
PermissionError
- [description]
FileNotFoundError
- [description]
FileExistsError
- [description]
ValueError
- [description]
URLError
- [description]
Returns
namedtuple
- namedtuple('DownloadResult', ['localpath', 'content', content_length]) - localpath (pathlib.Path) - content (bytes): only if localpath is None else it will be None always - content_length: the content length as returned from the response headers
Todo
- better performance with Multi-Threaded
- support resume
Examples
use default values for args will download the url to cwd and get the name from the url,
if the file already exists, it will overwritten.
>>> nettools.download('https://www.7-zip.org/a/7z1900-extra.7z') DownloadResult(localpath=PosixPath('/home/sameh/projects/js-ng/7z1900-extra.7z'), content=None, content_length='929117')
Expand source code
def download( url: str, localpath: Optional[str] = "", username: Optional[str] = None, passwd: Optional[str] = None, overwrite: Optional[bool] = True, append_to_home: Optional[bool] = False, name_from_url: Optional[bool] = True, ): """Download a url to a file or a directory, supported protocols: http, https, ftp, file Args: url (str): URL to download from localpath (str): filename or directory to download the url to. pass None to return the data. Defaults to ''. username (str, optional): username for the url if it requires authentication. Defaults to None. passwd (str, optional): password for the url if it requires authentication. Defaults to None. overwrite (bool, optional): if the file exists, it will be truncated. Defaults to True. append_to_home (bool, optional): if set to true, any relative path specified in localpath arg will be appended to the user home directory (that guaranteed to have a write permission). if set to False any relative localpath set by user will append to the current working directory. if user specified a absolute localpath (start with /) append_to_home will have no effect. Defaults to False. name_from_url (bool, optional): if set to true, localpath will treated as a dir, and will try to get the file name from the url or fallback to auto generated name. Defaults to True. Raises: PermissionError: [description] FileNotFoundError: [description] FileExistsError: [description] ValueError: [description] URLError: [description] Returns: namedtuple: namedtuple('DownloadResult', ['localpath', 'content', content_length]) - localpath (pathlib.Path) - content (bytes): only if localpath is None else it will be None always - content_length: the content length as returned from the response headers Todo: - better performance with Multi-Threaded - support resume Examples: # use default values for args will download the url to cwd and get the name from the url, # if the file already exists, it will overwritten. >>> nettools.download('https://www.7-zip.org/a/7z1900-extra.7z') DownloadResult(localpath=PosixPath('/home/sameh/projects/js-ng/7z1900-extra.7z'), content=None, content_length='929117') """ DownloadResult = namedtuple("DownloadResult", ["localpath", "content", "content_length"]) file_name = "" try: parsed_url = urlparse(url) except ValueError as e: j.logger.exception(repr(e), exception=e) raise if name_from_url: file_name = basename(parsed_url.path) # TODO: insure safe file name and fall back if can't get the file name elif localpath == "": j.logger.error(f"Improper args used.\nname_from_url: {name_from_url}\nlocalpath: {localpath}") raise ValueError("localpath can't be empty when name_from_url is False") if username and passwd: if parsed_url.scheme == "ftp": url = "ftp://%s:%s@" % (username, passwd) + url.split("://")[1] elif parsed_url.scheme in ("http", "https"): # create a password manager password_mgr = HTTPPasswordMgrWithDefaultRealm() # Add the username and password. # If we knew the realm, we could use it instead of None. password_mgr.add_password(None, parsed_url.netloc, username, passwd) handler = HTTPBasicAuthHandler(password_mgr) # create "opener" (OpenerDirector instance) opener = build_opener(handler) install_opener(opener) req = Request(url) req.add_header( "User-agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36", ) try: response = urlopen(req) content_length = response.headers.get("Content-Length") except URLError as e: if hasattr(e, "reason"): msg = "We failed to reach a server." msg += " Reason: " + e.reason elif hasattr(e, "code"): msg = "The server couldn't fulfill the request." msg += " Error code: " + e.code j.logger.exception(msg, exception=e) raise if localpath is None: return DownloadResult(localpath=None, content=response.read(), content_length=content_length) file_path = Path(localpath) / file_name if not file_path.is_absolute() and append_to_home: file_path = Path.home() / file_path dir_path = file_path.parent try: dir_path.mkdir(parents=True, exist_ok=True) except (PermissionError, FileNotFoundError) as e: j.logger.exception(e.strerror or repr(e), exception=e) raise if overwrite: file_mode = "wb" # if exists will truncated else: file_mode = "xb" # if exists will raise exception try: with file_path.open(file_mode) as f_handler: shutil.copyfileobj(response, f_handler, length=8 * 1024 * 1024) return DownloadResult(localpath=file_path.resolve(), content=None, content_length=content_length) finally: response.close()
def get_default_ip_config(ip: Optional[str] = '8.8.8.8') ‑> tuple
-
get default nic and address, by default, the one exposed to internet
Args
ip
:str
- ip address. default to '8.8.8.8'
Raises
ValueError
- if address does not represent a valid IPv4 or IPv6 address.
RuntimeError
- if can't connect
Returns
tuple
- default nic name and its ip address
Expand source code
def get_default_ip_config(ip: Optional[str] = "8.8.8.8") -> tuple: """get default nic and address, by default, the one exposed to internet Args: ip (str): ip address. default to '8.8.8.8' Raises: ValueError: if address does not represent a valid IPv4 or IPv6 address. RuntimeError: if can't connect Returns: tuple: default nic name and its ip address """ ipaddr = ipaddress.ip_address(ip) address_family = "ip" if ipaddr.version == 4 else "ip6" source_addr = get_reachable_ip_address(ip) default_nic = None for nic in get_network_info(): for candidate_ip, _ in nic[address_family]: if candidate_ip == source_addr: default_nic = (nic["name"], source_addr) break if default_nic is not None: break return default_nic
def get_free_port(ipv6: Optional[bool] = False, udp: Optional[bool] = False, return_socket: Optional[bool] = False)
-
Bind an ipv4 or ipv6 socket to port 0 to make OS pick a random, free and available port from 1024 to 65535.
you can optionally choose to reuse the socket by set return_socket to True (preferred to To prevent race conditions from occurring) but then it is your responsibility to close that socket calling its close method after you finish with it to free the selected port. by default you got tcp port, but setting udp to True will set socket type to UDP.
Args
ipv6
:bool
, optional- weather to bind the free port to 127.0.0.1 or ::1. Defaults to False.
udp
:bool
, optional- set socket type to udp instead of tcp. Defaults to False.
return_socket
:bool
, optional- return the socket alongside the port to reuse it. Defaults to False.
Returns
int
- returns a random free port from 1024 to 65535 range.
Optional[socket]
- in the case of return_socket set to True, socket will be returned alongside the port in a tuple.
Example
get a free TCP port that currently not binded to 127.0.0.1
>>> port = get_free_port() # get a free UDP port that currently not binded to 127.0.0.1 >>> port = get_free_port(udp=True) # get a free TCP port that currently not binded to ::1 >>> port = get_free_port(ipv6=True) # get a free TCP port that currently not binded to 127.0.0.1 # and reuse the socket instead of creating a socket and bind it to selected port >>> port, sock = get_free_port(return_socket=True) >>> sock.listen() .. >>> sock.close()
Expand source code
def get_free_port(ipv6: Optional[bool] = False, udp: Optional[bool] = False, return_socket: Optional[bool] = False): """Bind an ipv4 or ipv6 socket to port 0 to make OS pick a random, free and available port from 1024 to 65535. you can optionally choose to reuse the socket by set return_socket to True (preferred to To prevent race conditions from occurring) but then it is your responsibility to close that socket calling its close method after you finish with it to free the selected port. by default you got tcp port, but setting udp to True will set socket type to UDP. Args: ipv6 (bool, optional): weather to bind the free port to 127.0.0.1 or ::1. Defaults to False. udp (bool, optional): set socket type to udp instead of tcp. Defaults to False. return_socket (bool, optional): return the socket alongside the port to reuse it. Defaults to False. Returns: int: returns a random free port from 1024 to 65535 range. Optional[socket]: in the case of return_socket set to True, socket will be returned alongside the port in a tuple. Example: # get a free TCP port that currently not binded to 127.0.0.1 >>> port = get_free_port() # get a free UDP port that currently not binded to 127.0.0.1 >>> port = get_free_port(udp=True) # get a free TCP port that currently not binded to ::1 >>> port = get_free_port(ipv6=True) # get a free TCP port that currently not binded to 127.0.0.1 # and reuse the socket instead of creating a socket and bind it to selected port >>> port, sock = get_free_port(return_socket=True) >>> sock.listen() .. >>> sock.close() """ socket_type = socket.SOCK_DGRAM if udp else socket.SOCK_STREAM # Picking a random port is not a good idea - let the OS pick one for you. if ipv6: sock = socket.socket(socket.AF_INET6, socket_type) sock.bind(("::1", 0)) else: sock = socket.socket(socket.AF_INET, socket_type) sock.bind(("127.0.0.1", 0)) # retrieve the selected port with getsockname() right after bind() port = sock.getsockname()[1] # returns port or (port, socket) depend on the bool value of return_socket return (port, sock) if return_socket else port
def get_host_by_name(dnsHostname: str) ‑> str
-
get host address by its name
Args
dnsHostname
:str
- host name
Returns
str
- host address
Expand source code
def get_host_by_name(dnsHostname: str) -> str: # pragma: no cover - we're just proxying """get host address by its name Args: dnsHostname (str): host name Returns: str: host address """ return socket.gethostbyname(dnsHostname)
def get_host_name() ‑> str
-
Get hostname of the machine
Returns
str
- host name
Expand source code
def get_host_name() -> str: # pragma: no cover - we're just proxying """Get hostname of the machine Returns: str: host name """ return socket.gethostname()
def get_mac_address(interface: str) ‑> str
-
Return the MAC address of this interface
Args
interface
:str
- interface name
Returns
str
- mac of the interface
Expand source code
def get_mac_address(interface: str) -> str: """Return the MAC address of this interface Args: interface (str): interface name Returns: str: mac of the interface """ return get_network_info(interface)["mac"]
def get_network_info(device: Optional[str] = None) ‑> list
-
Get network info
Args
device
:str
, optional- device name. Defaults to None.
Raises
Runtime
- if it could not find the specified device
NotImplementedError
- if the function runs on unsupported OS
Returns
Dict,
orlist
ofdicts if device arg used
- network info
[{'ip': [('127.0.0.1', 8)], 'ip6': [('::1', 128)], 'mac': '00:00:00:00:00:00', 'name': 'lo'}, {'ip': [('192.168.1.6', 24)], 'ip6': [('fdb4:f58e:3c34:300:91f0:9c76:e1fb:d060', 64), …], 'mac': 'd8:9c:67:2a:f2:53', 'name': 'wlp3s0'}, …]
Expand source code
def get_network_info(device: Optional[str] = None) -> list: """Get network info Args: device (str, optional): device name. Defaults to None. Raises: Runtime: if it could not find the specified device NotImplementedError: if the function runs on unsupported OS Returns: Dict, or list of dicts if device arg used: network info [{'ip': [('127.0.0.1', 8)], 'ip6': [('::1', 128)], 'mac': '00:00:00:00:00:00', 'name': 'lo'}, {'ip': [('192.168.1.6', 24)], 'ip6': [('fdb4:f58e:3c34:300:91f0:9c76:e1fb:d060', 64), ...], 'mac': 'd8:9c:67:2a:f2:53', 'name': 'wlp3s0'}, ...] """ def _clean(nic_info: dict): result = { "ip": [(addr["local"], addr["prefixlen"]) for addr in nic_info["addr_info"] if addr["family"] == "inet"], "ip6": [(addr["local"], addr["prefixlen"]) for addr in nic_info["addr_info"] if addr["family"] == "inet6"], "mac": nic_info["address"], "name": nic_info["ifname"], } return result def _get_info(): if device: # exitcode, output, _ = jumpscale.core.executors.run_local(f"ip -j addr show {device}", hide=True, warn=True) # if exitcode != 0: # raise Runtime("could not find device") try: output = subprocess.check_output(f"ip -j addr show {device}", shell=True) except subprocess.CalledProcessError as e: # the process returns a non-zero exit status. # This probably happened because specified interface name does not exists j.logger.exception(f"cmd: {e.cmd} returns a non-zero exit status.", exception=e) raise RuntimeError(f"could not find this interface: {device}") else: # _, output, _ = jumpscale.core.executors.run_local("ip -j addr show", hide=True, warn=True) output = subprocess.check_output("ip -j addr show", shell=True) res = json.loads(output) for nic_info in res: # when use ip command with -j option and specified interface. it returns on ubuntu < 20 # a list contains a requested info alongside other partially empty dicts like this -> {'addr_info': [{}, {}]} # so we need to filter those dicts to get consistent behavior at all supported ubuntu versions. if len(nic_info) > 1: yield _clean(nic_info) else: j.logger.debug(f"Discarded this improper json\n{nic_info}") continue if jumpscale.data.platform.is_linux(): if not device: res = [] for nic in _get_info(): res.append(nic) return res else: return next(_get_info()) else: # TODO: make it OSX Compatible raise NotImplementedError("this function supports only linux at the moment.")
def get_nic_names() ‑> list
-
Get Nics on this machine
Returns
list
- list of all availabe nics
Expand source code
def get_nic_names() -> list: """Get Nics on this machine Returns: list: list of all availabe nics """ return [nic["name"] for nic in get_network_info()]
def get_nic_type(interface: str) ‑> str
-
Get Nic Type on a certain interface
Args
interface
:str
- interface name
Raises
Runtime
- if ethtool not installed on the system
Value
- if interface given is invalid
Returns
str
- type of the interface
Expand source code
def get_nic_type(interface: str) -> str: """Get Nic Type on a certain interface Args: interface (str): interface name Raises: Runtime: if ethtool not installed on the system Value: if interface given is invalid Returns: str: type of the interface """ output = "" if jumpscale.data.platform.is_linux(): if jumpscale.sals.fs.exists(f"/sys/class/net/{interface}"): output = jumpscale.sals.fs.read_file(f"/sys/class/net/{interface}/type") if output.strip() == "32": return "INFINIBAND" else: if jumpscale.sals.fs.exists("/proc/net/vlan/%s" % (interface)): return "VLAN" exitcode, _, _ = jumpscale.core.executors.run_local("which ethtool", hide=True, warn=True) if exitcode != 0: raise Runtime("Ethtool is not installed on this system!") exitcode, output, _ = jumpscale.core.executors.run_local(f"ethtool -i {interface}", hide=True, warn=True) if exitcode != 0: return "VIRTUAL" match = re.search(r"^driver:\s+(?P<driver>\w+)\s*$", output, re.MULTILINE) if match and match.group("driver") == "tun": return "VIRTUAL" if match and match.group("driver") == "bridge": return "VLAN" return "ETHERNET_GB" elif jumpscale.data.platform.is_osx(): command = f"ifconfig {interface}" exitcode, output, err = jumpscale.core.executors.run_local(command, hide=True, warn=True) if exitcode != 0: # temporary plumb the interface to lookup its mac jumpscale.core.executors.run_local(f"{command} plumb", hide=True) exitcode, output, err = jumpscale.core.executors.run_local(command, hide=True) jumpscale.core.executors.run_local(f"{command} unplumb", hide=True) if output.find("ipib") >= 0: return "INFINIBAND" else: # work with interfaces which are subnetted on vlans eq e1000g5000:1 interfacepieces = interface.split(":") interface = interfacepieces[0] match = re.search(r"^\w+?(?P<interfaceid>\d+)$", interface, re.MULTILINE) if not match: raise Value(f"Invalid interface {interface}") if len(match.group("interfaceid")) >= 4: return "VLAN" else: if len(interfacepieces) > 1: return "VIRTUAL" else: return "ETHERNET_GB"
def get_reachable_ip_address(ip: str, port: Optional[int] = 0) ‑> str
-
figures out what source address would be used if some traffic were to be sent out to specified ip. compatible with both IPv4 and IPv6.
Args
ip
:str
- ip address
port
:int
, optional- port number. does not matter much. No traffic is actually sent. Defaults to 0.
Raises
ValueError
- if address does not represent a valid IPv4 or IPv6 address, or port is invalid.
RuntimeError
- if can't connect
Returns
str
- ip that can connect to the specified ip
Expand source code
def get_reachable_ip_address(ip: str, port: Optional[int] = 0) -> str: """figures out what source address would be used if some traffic were to be sent out to specified ip. compatible with both IPv4 and IPv6. Args: ip (str): ip address port (int, optional): port number. does not matter much. No traffic is actually sent. Defaults to 0. Raises: ValueError: if address does not represent a valid IPv4 or IPv6 address, or port is invalid. RuntimeError: if can't connect Returns: str: ip that can connect to the specified ip """ ipaddr = ipaddress.ip_address(ip) if ipaddr.version == 4: family = socket.AF_INET else: family = socket.AF_INET6 with socket.socket(family, socket.SOCK_DGRAM) as sock: try: sock.connect((ip, port)) except OSError as e: # (ConnectionRefusedError, socket.timeout, socket.herror, socket.gaierror) reason = e.error if hasattr(e, "error") else repr(e) raise RuntimeError(f"Cannot connect to {ip}:{port} because of this error: {reason}") except (ValueError, TypeError, OverflowError) as e: # incorrect port numper or type raise ValueError(repr(e)) return sock.getsockname()[0]
def is_nic_connected(interface: str) ‑> bool
-
check if interface is connected
Args
interface
:str
- interface name
Returns
bool
- whether it is connected or not
Expand source code
def is_nic_connected(interface: str) -> bool: """check if interface is connected Args: interface (str): interface name Returns: bool: whether it is connected or not """ if jumpscale.data.platform.is_linux(): carrierfile = f"/sys/class/net/{interface}/carrier" try: is_up = int(jumpscale.sals.fs.read_file(carrierfile)) != 0 return is_up except IOError as e: j.logger.exception(e.strerror or repr(e), exception=e) return False elif jumpscale.data.platform.is_osx(): # superuser.com/questions/203272/ command = "ifconfig -{} | sed -E 's/[[:space:]:].*//;/^$/d" option = {"up": "u", "down": "d"} stdout = subprocess.check_output(command.format(option["up"]), shell=True) output = stdout.decode("utf-8") up_interfaces = output.split() return interface in up_interfaces
def netrange_get(device: str, skip_begin: Optional[int] = 10, skip_end: Optional[int] = 10) ‑> tuple
-
Get ($fromip,$topip) from range attached to device, skip the mentioned ip addresses.
Args
device
:str
- [description]
skip_begin
:Optional[int]
, optional- ips to skip from the begining of the range, Defaults to 10.
skip_end
:Optional[int]
, optional- ips to skip from the end of the range, Defaults to 10.
Returns
tuple
- ip range for this device
Expand source code
def netrange_get(device: str, skip_begin: Optional[int] = 10, skip_end: Optional[int] = 10) -> tuple: """Get ($fromip,$topip) from range attached to device, skip the mentioned ip addresses. Args: device (str): [description] skip_begin (Optional[int], optional): ips to skip from the begining of the range, Defaults to 10. skip_end (Optional[int], optional): ips to skip from the end of the range, Defaults to 10. Returns: tuple: ip range for this device """ n = _netobject_get(device) return (str(n[0] + skip_begin), str(n[-1] - skip_end))
def ping_machine(ip: str, timeout: Optional[int] = 60, allowhostname: Optional[bool] = True) ‑> bool
-
Ping a machine to check if it's up/running and accessible Note: Any well-behaved device on an LAN or WAN is free to ignore nearly any traffic, so PINGs, port scans, and the like are all unreliable.
Args
ip
:str
- Machine Ip Address
pingtimeout
:int
, optional- time in sec after which ip will be declared as unreachable. Defaults to 60.
allowhostname
:bool
, optional- allow pinging on hostname. Defaults to True.
Raises
ValueError
- if ip is Invalid ip address
NotImplementedError
- if the function runs on unsupported system
Returns
bool
- True if machine is pingable, False otherwise
Expand source code
def ping_machine(ip: str, timeout: Optional[int] = 60, allowhostname: Optional[bool] = True) -> bool: """Ping a machine to check if it's up/running and accessible Note: Any well-behaved device on an LAN or WAN is free to ignore nearly any traffic, so PINGs, port scans, and the like are all unreliable. Args: ip (str): Machine Ip Address pingtimeout (int, optional): time in sec after which ip will be declared as unreachable. Defaults to 60. allowhostname (bool, optional): allow pinging on hostname. Defaults to True. Raises: ValueError: if ip is Invalid ip address NotImplementedError: if the function runs on unsupported system Returns: bool: True if machine is pingable, False otherwise """ if not allowhostname: ipaddress.ip_address(ip) if jumpscale.data.platform.is_linux(): try: _ = subprocess.check_output(f"ping -c 1 -w {timeout} {ip}", shell=True) exitcode = 0 except subprocess.CalledProcessError as e: exitcode = e.returncode # exitcode, output, err = jumpscale.core.executors.run_local(f"ping -c 1 -w {timeout} {ip}", warn=True, hide=True) elif jumpscale.data.platform.is_osx(): try: _ = subprocess.check_output(f"ping -o -t {timeout} {ip}", shell=True) exitcode = 0 except subprocess.CalledProcessError as e: exitcode = e.returncode # exitcode, _, _ = jumpscale.core.executors.run_local(f"ping -o -t {timeout} {ip}", warn=True, hide=True) else: # unsupported platform raise NotImplementedError("Not Implemented for this os") return exitcode == 0
def tcp_connection_test(ipaddr: str, port: int, timeout: Optional[int] = None) ‑> bool
-
tests tcp connection on specified port, compatible with both IPv4 and IPv6. ensures that each side of the connection is reachable in the network.
Raises
socket.gaierror
- raised for address-related errors.
socket.herror
- raised for address-related errors.
Args
ipaddr
:str
- ip address or hostname
port
:int
- port number
timeout
:int
, optional- time before the connection test fails. Defaults to None.
Returns
bool
- True if the test succeeds, False otherwise
Expand source code
def tcp_connection_test(ipaddr: str, port: int, timeout: Optional[int] = None) -> bool: """tests tcp connection on specified port, compatible with both IPv4 and IPv6. ensures that each side of the connection is reachable in the network. Raises: socket.gaierror: raised for address-related errors. socket.herror: raised for address-related errors. Args: ipaddr (str): ip address or hostname port (int): port number timeout (int, optional): time before the connection test fails. Defaults to None. Returns: bool: True if the test succeeds, False otherwise """ try: with socket.create_connection((ipaddr, port), timeout) as conn: return True except (ConnectionRefusedError, socket.timeout, OSError): return False
def udp_connection_test(ipaddr: str, port: int, timeout: Optional[int] = 1, message: Optional[bytes] = b'') ‑> bool
-
tests udp connection on specified port by sending specified message and expecting to receive at least one byte from the socket as an indicator of connection success
Args
ipaddr
:str
- ip address
port
:int
- port number
timeout
:int
, optional- time before the connection test fails. Defaults to None.
message
:str
, optional- message to send. Defaults to b"PING"
Raises
ValueError
- raises if invalid ip address was used
Returns
bool
- True if the test succeeds, False otherwise
Expand source code
def udp_connection_test(ipaddr: str, port: int, timeout: Optional[int] = 1, message: Optional[bytes] = b"") -> bool: """tests udp connection on specified port by sending specified message and expecting to receive at least one byte from the socket as an indicator of connection success Args: ipaddr (str): ip address port (int): port number timeout (int, optional): time before the connection test fails. Defaults to None. message (str, optional): message to send. Defaults to b"PING" Raises: ValueError: raises if invalid ip address was used Returns: bool: True if the test succeeds, False otherwise """ ip = ipaddress.ip_address(ipaddr) if ip.version == 4: family = socket.AF_INET else: family = socket.AF_INET6 with socket.socket(family, socket.SOCK_DGRAM) as sock: if timeout: sock.settimeout(timeout) try: sock.sendto(message, (ipaddr, port)) # expecting to receive at least one byte from the socket as indication to succeed connection data, _ = sock.recvfrom(1) return True except (socket.timeout, OSError): return False
def wait_connection_test(ipaddr: str, port: int, timeout: Optional[int] = 6) ‑> bool
-
Will wait until port listens on the specified address or
timeout
sec elapsedunder the hood the function will try to connect every
interval
sec, if waiting timetimeout
set to value <= 2,interval
is 1 sec, otherwise 2.Args
ipaddr
:str
- ip address, or hostname
port
:int
- port number
timeout_total
:int
, optional- how long to wait for the connection. if the timeout set to value > 2, due to the way the function works, it makes sense to choose an even number. Defaults to 6.
Returns
bool
- True if the test succeeds, False otherwise
Expand source code
def wait_connection_test(ipaddr: str, port: int, timeout: Optional[int] = 6) -> bool: """Will wait until port listens on the specified address or `timeout` sec elapsed under the hood the function will try to connect every `interval` sec, if waiting time `timeout` set to value <= 2, `interval` is 1 sec, otherwise 2. Args: ipaddr (str): ip address, or hostname port (int): port number timeout_total (int, optional): how long to wait for the connection. if the timeout set to value > 2, due to the way the function works, it makes sense to choose an even number. Defaults to 6. Returns: bool: True if the test succeeds, False otherwise """ # port = int(port) interval = 1 if timeout <= 2 else 2 init_start = time.time() deadline = init_start + timeout while time.time() < deadline: start = time.time() if tcp_connection_test(ipaddr, port, timeout=interval): return True # if return immediately (err111) take a break before retry if time.time() - start < interval: time.sleep(1) return False
def wait_http_test(url: str, timeout: Optional[int] = 60, verify: Optional[bool] = True, interval_time: Optional[int] = 2) ‑> bool
-
Will keep try to reach specified url every {interval_time} sec until url become reachable or {timeout} sec elapsed
Args
url
:str
- url
timeout
:int
, optional- how long to keep trying to reach specified url. Defaults to 60.
verify
:bool
, optional- boolean indication to verify the servers TLS certificate or not.
interval_time
:int
, optional- how long to wait for a response before sending a new request. Defaults to 2.
Raises
ValueError
- raises if not correct url
Returns
bool
- true if the test succeeds
Expand source code
def wait_http_test( url: str, timeout: Optional[int] = 60, verify: Optional[bool] = True, interval_time: Optional[int] = 2 ) -> bool: """Will keep try to reach specified url every {interval_time} sec until url become reachable or {timeout} sec elapsed Args: url (str): url timeout (int, optional): how long to keep trying to reach specified url. Defaults to 60. verify (bool, optional): boolean indication to verify the servers TLS certificate or not. interval_time (int, optional): how long to wait for a response before sending a new request. Defaults to 2. Raises: ValueError: raises if not correct url Returns: bool: true if the test succeeds """ init_start = time.time() deadline = init_start + timeout while time.time() < deadline: start = time.time() if check_url_reachable(url, interval_time, verify): return True # be gentle on system resource in case the above call to check_url_reachable() returned immediately (edge cases) if time.time() - start < interval_time: time.sleep(1) return False