Skip to content
Snippets Groups Projects
beacon_reader.py 6.69 KiB
Newer Older
  • Learn to ignore specific revisions
  • __author__ = 'moritz'
    
    import serial
    import sys
    import time
    import itertools
    import math
    import functools
    
    import numpy as np
    import scipy.optimize
    
    
    ADVERTISERS = {"00:07:80:52:64:e6": [2.5, 5, 1],
                   "00:07:80:7e:c3:68": [1, 0.5, 0.5],
                   "00:07:80:7e:c3:7b": [5, 2, 1],
                   "00:07:80:68:1c:9c": [0.5, .5, 2.2],
                   "00:07:80:68:28:29": [6, 5, 1],
                   "00:07:80:68:28:67": [3, 3, 0.5],
                   "00:07:80:79:1f:f1": [6, .5, 1.8],
                   "00:07:80:c0:ff:ee": [0.5, 5, 1 ]
    }
    
    X=0
    Y=1
    Z=2
    
    C = - math.log(10) / 20
    
    def read_packet(s):
        s.timeout = None
        header = s.read(4)
        s.timeout = 0.1
    #    print(repr(header[0]))
        packet_length = ((header[0] & 0x03) << 8) + header[1]
        class_id = header[2]
        cmd_id = header[3]
        payload = s.read(packet_length)
    
        return class_id, cmd_id, payload
    
    def vec_sum (a, b):
        return [x + y for x, y in zip(a, b)]
    
    def vec_sub (a, b):
        return [x - y for x, y in zip(a, b)]
    
    def vec_abs (a):
        return sum([x ** 2 for x in a]) ** 0.5
    
    def vec_vprod(a,b):
        return a[Y] * b[Z] - a[Z] * b[Y], a[Z] * b[X] - a[X] * b[Z], a[X] * b[Y] - a[Y] * b[X]
    
    def vec_sprod(a, b):
        return sum([x * y for x, y in zip(a, b)])
    
    def vec_scale(a, b):
        return [b * x for x in a]
    
    
    def get_position(advertisers, debug=False, callback=None):
        if len(advertisers) < 4:
            return None
    
        # get distances s according to the field strengths
        s = np.array([x.get_distance() for x in advertisers])
        # get starting position sx as offset next to the strongest advertiser
        sx = vec_sub(min(map(lambda a: (a.get_distance(), a.pos), advertisers))[1], (0,0,0.2))
        beacon_positions = np.array([x.pos for x in advertisers])
        beacon_variance  = np.array([x.get_distance_error() for x in advertisers])
        a = np.array([1] * len(advertisers))
    
        def error (x):
            # squared error = sum of [(setpoint - measurement)**2]
            # get distances to the beacons
            r=(((x-beacon_positions)**2).sum(axis=1))**0.5
            # get all [(meas distance - calc distance)**2]
            errors = (s-r)**2
            # cut off high errors to decrease the influence of values that are far away
            errors = np.fmin(errors, 0*r+1**2)
            # get scaling factors (beacon_variance, replace None by some low value)
            scaling = []
            for bv in beacon_variance:
                if bv is None: scaling.append(1e-20) # there is no variance
                elif bv == 0:  scaling.append(1e-20) # 0 cannot be inverted
                else:          scaling.append(1/bv)  # 
            scaling = np.array(scaling)
            # not working because python doesn't lazy evaluate arrays
    #        scaling = map(lambda s: [1e-6,1/s][s in [float, int, long]], beacon_variance)
            # return sum of all [(meas distance - calc distance)**2]
            return (errors).sum()
            return (errors * scaling).sum()
    
        # fmin works since it doesn't need the brocken error gradient funktion
        return scipy.optimize.fmin(error, sx, ftol=1e-2, disp=debug, callback=callback)
    
    
    
    class Advertiser ():
        def __init__(self, mac, pos, tx_power):
            self.mac = mac
            self.pos = pos
            self.tx_power = tx_power
            self.rssi = {}
    
        def clean_measurements(self, old=2):
            now = time.time()
            tmp = {}
            for ts in self.rssi:
                if now - ts < old:
                    tmp[ts] = self.rssi[ts]
            self.rssi = tmp
    
        def add_measurement(self, rssi):
            self.clean_measurements()
            self.rssi[time.time()] = rssi
    
        def get_medium_rssi(self):
            self.clean_measurements()
            if len(self.rssi) != 0:
                return sorted(self.rssi.values())[len(self.rssi) // 2]
            else:
                return None
    
        def get_middled_rssi(self):
            self.clean_measurements()
            if len(self.rssi) != 0:
                return functools.reduce(lambda x, y: x + y, self.rssi.values()) / float(len(self.rssi))
            else:
                return None
    
        def get_rssi_error(self):
            if len(self.rssi) >= 2:
                m = self.get_middled_rssi()
                return functools.reduce(lambda x, y: x + y, [(v - m) ** 2 for v in self.rssi.values()]) / (len(self.rssi) - 1)
            else:
                return None
    
        def get_distance(self):
            if self.get_middled_rssi() != None:
                ratio_power = self.tx_power - self.get_middled_rssi()
                return (10 ** (ratio_power / 10.)) ** 0.5
            else:
                return None
    
        def get_distance_error(self):
            try: return C * self.get_distance() * self.get_rssi_error()
            except: return None
    
        def __str__(self):
            try:
                return "%s: (%d/%0.2f|%0.2f|%d) %0.5f+-%0.3f" % (self.mac, self.get_medium_rssi(),
                                                                    self.get_middled_rssi(),
                                                                    self.get_rssi_error(),
                                                                    self.tx_power,
                                                                    self.get_distance(),
                                                                    self.get_distance_error())
            except TypeError:
                return self.mac + ": - "
    
        def __gt__(self, other):
            self.get_middled_rssi() > other.get_middled_rssi()
    
        def __lt__(self, other):
            self.get_middled_rssi() < other.get_middled_rssi()
    
        def __eq__(self, other):
            self.get_middled_rssi() == other.get_middled_rssi()
    
    if __name__ == "__main__":
        s = serial.Serial(sys.argv[1], baudrate=25600, rtscts=True)
        s.timeout = 0.1
        s.readall()
        s.write(b"\x00\x01\x06\x02\x01")
    
        devices = {}
    
        with open("%d.log" % (time.time()), "w") as fd:
            try:
                while True:
                    class_id, cmd_id, payload = read_packet(s)
                    if class_id == 0x06 and cmd_id == 0x00 and len(payload) == 0x29:
                        rssi = payload[0] - 0xFF
                        mac = ":".join(map(lambda c: "%02x" % (c,), payload[7:1:-1]))
                        tx_power = payload[-1] - 0xFF
        #                print ("a:", mac, repr(devices))
                        if mac not in devices.keys():
                            if mac in ADVERTISERS:
                                devices[mac] = Advertiser(mac, ADVERTISERS[mac], tx_power)
                            else:
                                devices[mac] = Advertiser(mac, None, tx_power)
                        devices[mac].add_measurement(rssi)
                    pos = get_position(list(filter(lambda x: x.pos != None and x.get_distance() != None, devices.values())))
                    print("\x1b[2J")
                    for i in sorted(devices):
                        print(devices[i])
                    print (repr(pos))
    
    
            except KeyboardInterrupt:
                s.write(b"\x00\x00\x06\x04")