Skip to content
Snippets Groups Projects
test_beacon_reader.py 10 KiB
Newer Older
__author__ = 'dd2'

import beacon_reader as br

debug = 0
#debug = 1

import numpy as np
import math
import random as r
import collections as cl

import unittest


def points_to_distance(a, b):
    return sum(map (lambda c,d: (c-d)**2, a, b))**0.5

def distance_to_rssi(distance, tx_power=1):
    return distance_to_rx_power(distance, tx_power)

def distance_to_rx_power(distance, tx_power=1):
#	getting rx power out of the original distance calculation
#	dist = (10**((tx_pow - rx_pow)/10))**0.5
#	dist**2 = 10**((tx_pow - rx_pow)/10)
#	10*log(dist**2, 10) = tx_pow - rx_pow
    return tx_power - 10*math.log(distance**2, 10)

def pos_to_str(a):
    return '(%5.2f, %5.2f, %5.2f)' % (a[0], a[1], a[2])

class BeconReader_TestCase(unittest.TestCase):
    def setUp(self):
        pass

    def tearDown(self):
        pass

    # main functions that does one complete test
    def get_positions (self, Advertisers,
                             expected_positions,
                             noise_fct=(lambda x: x),
                             allowed_error=1.0, 
                             list_of_broken_beacons=[]
                             ):
        tx_power = 1
        self.ADVERTISERS = Advertisers
        self.devices = {mac: br.Advertiser(mac, pos, tx_power) for mac, pos in self.ADVERTISERS.items()}

        for expected_pos in [np.array(p) for p in expected_positions]:
            # create some data on which the algorithms are going to work for the actual expected position
            for mac, dev in self.devices.items ():
                dev.clean_measurements(0) # drop all measurements
                for i in range(10): # work on 10 measurements per beacon (2 s times 5 samples per s)
                    # if the actual beacon is broken, add some serious error
                    n_fct = [noise_fct, lambda n: noise_fct(100*n)][mac in list_of_broken_beacons]
                    dev.add_measurement (n_fct (distance_to_rssi (points_to_distance (expected_pos, dev.pos))))
                if not debug: continue
                print('%s: added %-15s, %-4.2f m, rssi %-6.3f' % (
                      mac, pos_to_str (dev.pos), 
                      points_to_distance (expected_pos, dev.pos),
                      distance_to_rssi (points_to_distance (expected_pos, dev.pos))),
                      ['OK', 'NOK'][mac in list_of_broken_beacons]
                )

            # if print_vector is turned on, everey step of scipy.optimize.fmin is revealed
            def print_vector(xk): print ('current vector: %s' % pos_to_str(xk))

            # call the function under test
            measured_pos = br.get_position (self.devices.values (), debug, [None, print_vector][debug>1])

            # get some quality/accuracy measure
            distance_error = points_to_distance (expected_pos, measured_pos)
            if debug: print ('expected pos: %s,\nmeasured pos: %s,\ndistance: %5.2f m\n' % (
                             pos_to_str(expected_pos), pos_to_str(measured_pos), distance_error))
            # actually check the quality/accuracy measure
            self.assertLess(distance_error, allowed_error, '''
  distance: %f
  expected: < %f
  testing %s
  measured %s
  beacon information:
    %s''' % 
    (distance_error,
     allowed_error,
     pos_to_str(expected_pos),
     pos_to_str(measured_pos),
     '\n    '.join(map(lambda m: 'mac %s, pos %s, rssi %0.2f, meas dist %0.2f, act dist %0.2f, meas dist error %0.2f' % \
                             (m[0], pos_to_str(m[1].pos), m[1].get_middled_rssi(), m[1].get_distance(),
                              points_to_distance (expected_pos, m[1].pos), m[1].get_distance_error(),),
                   self.devices.items())),
     ))

    def runTest(self):
        pass

class test_positions (BeconReader_TestCase):
    pass # fixture to add some tests programmatically later


if __name__ == '__main__':
    import random as r
    import copy

    noise_measures = [ # (signal noise level [???], exceptable distance error [m])
        (0, 0.2),
        (0.01, 0.2),
        (0.03, 0.2),
        (0.1, 0.2),
        (0.3, 0.5),
        (1.0, 1.0),
        (3.0, 3.0),
        (10, 5.0),
    ]

    test_setups = cl.OrderedDict ([ # OrderedDict should keep the order given during assignment
        ('simple_example_1', (
            {
                "00:07:80:52:64:e6": [0.0, 0.0, 0.0],
                "00:07:80:7e:c3:68": [2.0, 0.0, 0.0],
                "00:07:80:7e:c3:7b": [0.0, 2.0, 0.0],
                "00:07:80:68:1c:9c": [0.0, 0.0, 1.0],
            },[
                [0.1, 0.0, 0.0],
                [0.2, 0.0, 0.0],
                [0.3, 0.0, 0.0],
                [0.5, 0.0, 0.0],
                [1.0, 1.0, 0.5],
                [0.2, 1.0, 1.8],
                [0.2, 1.8, 0.2],
                [0.2, 0.2, 0.5],
            ])
        ),
        ('simple_example_2', (
            {
                "00:07:80:52:64:e6": [0.0, 0.0, 0.0],
                "00:07:80:7e:c3:68": [2.0, 2.0, 0.0],
                "00:07:80:7e:c3:7b": [2.0, 0.0, 1.0],
                "00:07:80:68:1c:9c": [0.0, 2.0, 1.0],
            },[
                [0.1, 0.0, 0.0],
                [0.2, 0.0, 0.0],
                [0.3, 0.0, 0.0],
                [0.5, 0.0, 0.0],
                [1.0, 1.0, 0.5],
                [0.2, 1.0, 1.8],
                [0.2, 1.8, 0.2],
                [0.2, 0.2, 0.5],
            ])
        ),
        ('more_realistic_example', (
            {
                "00:07:80:52:64:e6": [2.5, 5.0, 1.0],
                "00:07:80:7e:c3:68": [1.0, 0.5, 0.5],
                "00:07:80:7e:c3:7b": [5.0, 2.0, 1.0],
                "00:07:80:68:1c:9c": [0.5, 0.5, 2.2],
                "00:07:80:68:28:29": [6.0, 5.0, 1.0],
                "00:07:80:68:28:67": [3.0, 3.0, 0.5],
                "00:07:80:79:1f:f1": [6.0, 0.5, 1.8],
                "00:07:80:c0:ff:ee": [0.5, 5.0, 1.0],
            },[
                [2.5, 3.0, 0.0],
                [3.0, 3.0, 0.0],
                [3.5, 3.0, 0.0],
                [0.0, 0.0, 0.0],
                [0.2, 1.0, 1.8],
                [0.2, 1.8, 0.2],
                [0.2, 0.2, 0.5],
            ])
        ),
    ])
    # drop specific setups for speedup during integration
    if debug:
        test_setups.pop('simple_example_2')
        test_setups.pop('more_realistic_example')

    # test definition to be added to the initially empty class test_positions
    test_function_code = '''
# build the function that implements a specific test
def test_%(setup)s__%(noise)s__%(broken_beacon_test_name)s(self):
    if debug:
        print 79*\'_\'
        print \'calling test_%(setup)s__%(noise)s__%(broken_beacon_test_name)s()\'
    self.get_positions(%(Advertisers)s,
                       %(expected_positions)s,
                       %(noise_fct)s,
                       %(exceptable_distance_error)s,
                       %(broken_beacons)s)

# assign the newly build test to the global class definition test_positions
test_positions.test_%(setup)s__%(noise)s__%(broken_beacon_test_name)s = \
               test_%(setup)s__%(noise)s__%(broken_beacon_test_name)s
'''

    #################################################
    # test all test setups using all noise measures #
    #################################################
    for noise_measure, exceptable_distance_error in noise_measures:
        noise_fct = lambda a: a+2*(r.random()-0.5)*noise_measure
        for test_name, test_setup in test_setups.items():
            # collect the content for the placeholders within the test_function_code
            paradict = {
                'setup': test_name,
                'noise': ('%0.2f' % noise_measure).replace('.', '_'),
                'Advertisers': repr(test_setup[0]),
                'expected_positions': repr(test_setup[1]),
                'noise_fct': 'lambda a: a+2*(r.random()-0.5)*%f' % noise_measure,
                'exceptable_distance_error': exceptable_distance_error,
                'broken_beacons': [],
                'broken_beacon_test_name': 'no_broken_beacons',
            }
            # generate the test function and assign it to the globally defined class test_positions
            exec (test_function_code % paradict)
            if debug: break # stop after adding the 1st test if debugging
            pass
        if debug: break
        pass


    ###################################################################
    # test all test setups except the simple ones with broken beacons #
    ###################################################################
    # define a unique but less challenging noise function for that test
    noise_fct = lambda a: a+2*(r.random()-0.5) * 1.0
    exceptable_distance_error = 1

    # loop over all test setups not containing 'simple' which consist out of just 4 beacons
    # 4 beacons do not allow broken beacons (because 3D distribution is required)
    for test_name, test_setup in filter(lambda t: 'simple' not in t[0], test_setups.items()):
        # loop over every beacon, disturb its values
        for broken_beacons, broken_beacon_test_name in map(lambda mac: ([mac], mac+'_broken'), test_setup[0].keys()):
            # collect the content for the placeholders within the test_function_code
            paradict = {
                'setup': test_name,
                'noise': ('%0.2f' % noise_measure).replace('.', '_'),
                'Advertisers': repr(test_setup[0]),
                'expected_positions': repr(test_setup[1]),
                'noise_fct': 'lambda a: a+2*(r.random()-0.5)*%f' % noise_measure,
                'exceptable_distance_error': exceptable_distance_error*(1+len(broken_beacons)),
                'broken_beacons': broken_beacons,
                'broken_beacon_test_name': broken_beacon_test_name.replace(':', '_'),
            }
            # generate the test function and assign it to the globally defined class test_positions
            exec (test_function_code % paradict)
            if debug: break
            pass
        if debug: break
        pass

    # list all testfunctions
    #for e in filter(lambda e: e[:5] == 'test_', dir(test_positions)): print e


    suite = unittest.TestSuite()
    suite.addTest(test_positions())
    unittest.main()