Source code for statistician

#!/usr/bin/env python
# coding: utf8

from __future__ import (unicode_literals, absolute_import, division, print_function)

from threading import Thread, Lock
from operator import itemgetter

from Queue import Queue, Empty
import datetime
import re
import time

import display as d
import log_writer
import reader


[docs]class AlertParam: """ Stores the parameters used in the alert detection process. ``short_median`` and ``long_median`` are the size of the windows for the moving averages. An alert will be raised if ``short_outflow_average > long_outflow_average * threshold``. Warnings -------- The unit base of ``short_median`` and ``long_median`` is ``time_resolution``, that means that ``long_median=2`` with ``time_resolution=10`` will calculate an outflow average on the last ``2 * 10 = 20`` seconds. """ def __init__(self, short_median=12, long_median=120, threshold=1.5, time_resolution=10): self.short_median = int(short_median) self.long_median = int(long_median) self.threshold = float(threshold) self.time_resolution = time_resolution if self.short_median >= self.long_median: raise ValueError('short_median must be smaller than long_median') if self.threshold <= 1.0: raise ValueError('threshold must be bigger than 1')
[docs]class Statistics: """ Object used by the statistician as its "notebook". This is were the stats are saved. It calls the Displayer if an alert should be raised or shut down. It is called when the stats should be printed. Attributes ---------- section: dictionary The keys are the hit sections, values are the number of hits for each section. total_bytes: int Sum of the bytes sent. total_hits: int Total number of hits. should_run: bool If False, the thread will shortly end stop its operation. Used to cleanly end the program. long_term_bytes_buffer: int Stores the sum of bytes sent during a certain time, then is appended to the long_term_bytes list and reseted. long_term_bytes: list of int List that stores the evolution of the number of sent bytes, used to compute moving average. alert_raised: bool True if an alert has been raised and not shut down. Note ---- The use of ``statistics.lock`` makes this object thread-safe. Warnings -------- ``number_of_hits``, ``total_bytes`` and ``total_hits`` are used for the printed stats, 'total' is in fact 'total since the last display'. """ def __init__(self): self.lock = Lock() # used for the printed stats # 'total' is in fact 'total since the last display' self.section = {} self.total_bytes = 0 self.total_hits = 0 # used for the alerts self.long_term_bytes_buffer = 0 self.long_term_bytes = [] self.alert_raised = False
[docs] def upadate_stat(self, HTTP_dict): """Update the stats with the given parse line (ie the HTTP_dict)""" with self.lock: last_section = reader.get_section(HTTP_dict['request']) # print(HTTP_dict['request'], last_section) if last_section is not None: if last_section in self.section: self.section[last_section] += 1 else: self.section[last_section] = 1 else: d.displayer.log(self, d.LogLevel, "Wrong HTTP request '{}': section not found" "".format(HTTP_dict['request'])) self.total_bytes += HTTP_dict['bytes'] self.total_hits += 1 self.long_term_bytes_buffer += HTTP_dict['bytes']
[docs] def get_last_stats(self): """Returns a stats dict, used for the regular stats printing""" stats = dict.fromkeys(('max_section', 'max_hit', 'total_bytes', 'total_hits'), 0) with self.lock: stats['total_bytes'] = self.total_bytes stats['total_hits'] = self.total_hits try: stats['max_section'], stats['max_hit'] = max(self.section.iteritems(), key=itemgetter(1)) return stats except ValueError: return stats
[docs] def reset_short_stat(self): """Called by the displayer after get_last_stats to reset the 'printing stats'""" with self.lock: self.section.clear() self.total_bytes = 0 self.total_hits = 0
[docs] def update_long_term(self, alert_param): """ Update the ``long_term_bytes`` list. Checks if an alert should be raised (or shut down), and raises it if necessary. Called by the Statistician every AlertParam.time_resolution. """ with self.lock: self.long_term_bytes.append(self.long_term_bytes_buffer) self.long_term_bytes_buffer = 0 if len(self.long_term_bytes) > alert_param.long_median + alert_param.short_median: del self.long_term_bytes[0] emergency = self.emergency(alert_param) if emergency['alert']: del emergency['alert'] if not self.alert_raised: self.alert_raised = True d.displayer.print_new_alert(**emergency) else: del emergency['alert'] if self.alert_raised: self.alert_raised = False d.displayer.print_end_alert(**emergency)
[docs] def emergency(self, alert_param): """Returns a dictionary with the alert parameters if there is one. Called by ``update_long_term``""" short_term = self.long_term_bytes[-alert_param.short_median:] long_term = self.long_term_bytes[:-alert_param.short_median] short_term_average = sum(short_term) / alert_param.short_median long_term_average = sum(long_term) / alert_param.long_median res = {'alert_param': alert_param, 'short_average': short_term_average, 'long_average': long_term_average} if short_term_average > long_term_average * alert_param.threshold: res['alert'] = True else: res['alert'] = False return res
[docs]class Statistician(Thread): """ This thread object is responsible for the statistics maintenance, it has a :obj:`Statistics` object to store them and raise the alerts. It possesses the alert parameters. Read lines are 'thread-safely' received thanks to an ``input_queue``. They should be parsed by default, but this can be changed with the ``parse`` parameter. Attributes ---------- should_run: bool If False, the thread will shortly end stop its operation. Used to cleanly end the program. Note ---- Alerts are checked every ``AlertParam.time_resolution``. """ def __init__(self, input_queue, sleeping_time=0.1, parse=False, alert_param=AlertParam()): Thread.__init__(self) self.input_queue = input_queue self.sleeping_time = sleeping_time self.parse = parse self.alert_param = alert_param self.total_nb_of_treated_line = 0 self.stat = Statistics() self.should_run = True self.name = 'statistician thread' self.last_alert_check = time.time()
[docs] def run(self): """ Checks if an alert should be raised, checks the input queue, update the stats if necessary and starts again.""" while self.should_run: # if the alert should be check (ie every alert_param.time_resolution) if time.time() - self.last_alert_check > self.alert_param.time_resolution: # print(time.time() - self.last_alert_check) self.stat.update_long_term(self.alert_param) self.last_alert_check = time.time() # we wait for an input in the queue, with a timeout to avoid stucking the program try: log_line = self.input_queue.get(block=True, timeout=0.1) except Empty: continue if self.parse: # False by default try: HTTP_dict = reader.parse_line(log_line) except reader.HTTPFormatError as e: d.displayer.log(self, d.LogLevel.ERROR, e.message) else: HTTP_dict = log_line # we update the stats self.stat.upadate_stat(HTTP_dict) self.total_nb_of_treated_line += 1 if self.input_queue.qsize() == 0: d.displayer.log(self, d.LogLevel.INFO, "Queue emptied after {} lines" "".format(self.total_nb_of_treated_line)) time.sleep(self.sleeping_time)
[docs] def state(self): """ Returns ------- string Describes the present thread state """ return 'total nb of treated line: {}, in queue: {}' \ ''.format(self.total_nb_of_treated_line, self.input_queue.qsize())
[docs]class QueueWriter(Thread): """ Used to fill the Statistician queue, to simulate a fast reading and compare the reading speed with or without parsing. Note ---- pace10 is the pace for 100ms, ie ``10*pace10`` entries are put in the queue every second. """ def __init__(self, output_queue, parse=True, pace10=1, factor=2): Thread.__init__(self) self.output_queue = output_queue self.pace10 = pace10 self.factor = factor # used for the URL generation self.parse = parse self.should_run = True
[docs] def run(self): """ Puts ``n=pace10`` lines every 10th of a second in the ``output_queue`` """ total_count = 0 random_log_line = log_writer.random_log_line_maker('HTTP_slow', factor=self.factor) while self.should_run: line_count_second10 = 0 start_time_second10 = time.time() while self.should_run and line_count_second10 < self.pace10 and time.time() - start_time_second10 < 0.1: if self.parse: try: log_line = reader.parse_line( random_log_line(date=datetime.datetime.utcnow().strftime('[%d/%b/%Y:%X +0000]'))) except reader.HTTPFormatError as e: d.displayer.log(self, d.LogLevel.ERROR, e.message) else: log_line = random_log_line(date=datetime.datetime.utcnow().strftime('[%d/%b/%Y:%X +0000]')) self.output_queue.put(log_line) line_count_second10 += 1 # print(line_count_second10) delta = time.time() - start_time_second10 if delta < 0.1 and line_count_second10 == self.pace10: # check that line !! # print('waiting for :', 0.1 - delta) time.sleep(0.1 - delta) else: print('putting operations are too slow, reduce pace10, ' 'lines putted this 10th of second:', line_count_second10) total_count += line_count_second10
# print('qsize', self.output_queue.qsize()) # print('total count', total_count) if __name__ == '__main__': def simulate_putter_and_getter(with_getter=True): """" Simulate a statistician connected to a LogReader (that is in fact a QueueWriter) to study the performances and optimise the system """ statistician_parse = True q = Queue() pace10 = 100 # ie pace = 10*pace10 putter = QueueWriter(q, parse=not statistician_parse, pace10=pace10) # pace10=1000 -> 10,000 put/sec putter.start() if with_getter: displayer = d.Displayer(debug=True) # needed for d.displayer.log calls m = Statistician(q, parse=statistician_parse) m.start() time.sleep(3) putter.should_run = False if with_getter: m.should_run = False simulate_putter_and_getter(with_getter=True)