""" Example Usage:
>>> import pingstats
>>> pings = pingstats.Pings('google.ca')
>>> for ping in pings:
... pingstats.plot_pings(pings.realtime_data)
# HIPSTERPLOT OUTPUT
... print(ping)
# The most recent return time in milliseconds
"""
from __future__ import print_function, with_statement
# STANDARD LIBRARY PACKAGES
from subprocess import Popen, PIPE
from argparse import ArgumentParser
from os import name
from shutil import which
import sys
# NON STANDARD LIBRARY PACKAGES
try:
if which('gnuplot'):
import asciiplotlib as apl
else:
raise RuntimeError('cannot import asciiplotlib without gnuplot')
except RuntimeError:
from hipsterplot import plot
from .config import get_lines, append_argv, parse_lines
# RUN CONFIG
if sys.argv[0].count('pingstats') and not sys.argv[0].count('.py'):
lines = get_lines()
if lines is not None:
try:
append_argv(parse_lines(lines))
except AssertionError as e:
print(str(e))
exit(1)
__version__ = "1.0.0"
PROG_NAME = 'pingstats'
# PARSER CONFIG
parser = ArgumentParser(prog=PROG_NAME)
parser.add_argument('address', help='The address to ping. This could be either '
'a web address (i.e, "google.ca") or an IP address.')
parser.add_argument('-V', '--version', action='version',
version='%(prog)s {}'.format(__version__))
X_COLUMN_SCALE = 11 if name != 'nt' else 13
Y_ROW_SCALE = 0
# __all__ = ['Pings', 'plot_pings']
[docs]class Pings:
""" Main logic object for obtaining ping data.
Example Usage:
>>> from pingstats import Pings
>>> pings = Pings('127.0.0.1')
>>> for this_time in pings:
... print(pings.realtime_data)
... print(pings.average_data)
... print(pings.current_line)
... print(this_time) # current ping's realtime data in milliseconds
"""
[docs] def __init__(self, address, realtime_data=[], average_data=[], realtime_data_length=20, average_data_length=200):
""" Instantiates data used during the ping retrieval process.
:address: The address to send ICMP ECHO requests to.
:realtime_data: A python list object to continue to append data to.
:average_data: A python list object to continue to append data to.
:realtime_data_length: The maximum size of the realtime_data list
:average_data_length: The maximum size of the average_data list
"""
# NOTE: Must create data copy of argument lists to ensure new instances
# have new lists.
self.address = address
self.realtime_data, self.average_data = realtime_data[:], average_data[:]
self._realtime_data_length, self._average_data_length = realtime_data_length, average_data_length
self.current_line = ''
def _decode_line(self, line):
""" Decodes a line from internal ping call, returning the connection time.
..note: This function also correctly handles switching between POSIX
standard ``ping.c`` and Windows NT ``ping.c`` formats.
:line: A line returned from `get_ping.process`.
:returns: Either the return time in milliseconds (as float), -1 on timeout,
or None.
"""
if line.lower().count('ttl'):
if name != "nt":
return float(line.split('time=')[1].split(' ')[0])
else:
if line.count('time<'):
return float(line.split('time<')[1].split(' ')[0].strip('<ms'))
else:
return float(line.split('time=')[1].split(' ')[0].strip('ms'))
elif line.lower().count('0 received' if name != 'nt' else '100% loss'):
return -1
def _run_subprocess(self, address):
""" Runs a ping subprocess, passing the user specified address to the command.
..note: This function also correctly handles switching between POSIX standard ``ping.c``
and Windows NT ``ping.c`` formats.
:address: The address to request ``ICMP ECHO`` requests to (i.e 'google.ca')
:returns: The output of the sub process (POSIX standard or Windows NT ping output)
"""
if name != 'nt':
process = Popen(['ping', '-c 1', address], stdout=PIPE)
else:
process = Popen(['ping', '-n', '1', address], stdout=PIPE)
process.wait()
stdout, stderr = process.communicate()
return stdout.decode('UTF-8').splitlines()
def __iter__(self):
""" Iteratively spawns ``ICMP`` requests for ``self.address``.
:yields: The return time in milliseconds
"""
# init data
this_time = 0
while 1:
for line in self._run_subprocess(self.address):
if len(self.realtime_data) > self._realtime_data_length: # enforce max length
self.realtime_data.pop(0)
line_value = self._decode_line(line) # get float return length from line
if line_value is not None:
this_time = line_value
self.current_line = line
self.realtime_data.append(line_value)
else: # On end of for loop, calculate average and append it
if len(self.average_data) == self._average_data_length:
self.average_data.pop(0)
self.average_data.append(sum(self.realtime_data) / len(self.realtime_data if self.realtime_data != 0 else 1))
yield this_time
[docs]def get_pings(address): # TODO Needs additional Testing
""" Yields the last set of realtime and average data points, according to user
specified maximum.
:address: The address to send ``ICMP ECHO`` requests to.
"""
pings = Pings(address)
for ping in pings:
yield pings.realtime_data[-1], pings.average_data[-1]
def __hipsterplot_render(pings, columns=70, rows=15):
""" Provides high level bindings to ``hipsterplot.plot`` for use with :py:func:`pingstats.get_pings`.
:pings: A list object containing ping return times as float values, normally from :py:func:`pingstats.get_pings`
:columns: The number of columns to draw for the plot
:rows: The number of rows to draw for the plot
"""
plot(pings, num_x_chars=columns, num_y_chars=rows)
def __asciiplotlib_render(pings, columns=70, rows=15):
""" Provides high level bindings to ``hipsterplot.plot`` for use with :py:func:`pingstats.get_pings`.
:pings: A list object containing ping return times as float values, normally from :py:func:`pingstats.get_pings`
:columns: The number of columns to draw for the plot
:rows: The number of rows to draw for the plot
"""
x_points = [i + 1 for i in range(len(pings))]
fig = apl.figure()
fig.plot(x_points,
pings,
width=columns,
height=rows,
ylim=(min(pings) - 1, max(pings) + 2),
label="Return Time (milliseconds)",
xlabel="ICMP Request Number")
fig.show()
[docs]def plot_pings(pings, columns=70, rows=15):
""" Provides high level bindings to ``hipsterplot.plot`` for use with :py:func:`pingstats.get_pings`.
:pings: A list object containing ping return times as float values, normally from :py:func:`pingstats.get_pings`
:columns: The number of columns to draw for the plot
:rows: The number of rows to draw for the plot
"""
if sys.modules['asciiplotlib']:
__asciiplotlib_render(pings, columns=columns, rows=rows)
else:
__hipsterplot_render(pings, columns=columns, rows=rows)