Skip to content
Snippets Groups Projects
beacon_reader.py 6.69 KiB
Newer Older
__author__ = 'moritz'

import serial
import sys
import time
import itertools
import math
import functools

import numpy as np
import scipy.optimize


ADVERTISERS = {"00:07:80:52:64:e6": [2.5, 5, 1],
               "00:07:80:7e:c3:68": [1, 0.5, 0.5],
               "00:07:80:7e:c3:7b": [5, 2, 1],
               "00:07:80:68:1c:9c": [0.5, .5, 2.2],
               "00:07:80:68:28:29": [6, 5, 1],
               "00:07:80:68:28:67": [3, 3, 0.5],
               "00:07:80:79:1f:f1": [6, .5, 1.8],
               "00:07:80:c0:ff:ee": [0.5, 5, 1 ]
}

X=0
Y=1
Z=2

C = - math.log(10) / 20

def read_packet(s):
    s.timeout = None
    header = s.read(4)
    s.timeout = 0.1
#    print(repr(header[0]))
    packet_length = ((header[0] & 0x03) << 8) + header[1]
    class_id = header[2]
    cmd_id = header[3]
    payload = s.read(packet_length)

    return class_id, cmd_id, payload

def vec_sum (a, b):
    return [x + y for x, y in zip(a, b)]

def vec_sub (a, b):
    return [x - y for x, y in zip(a, b)]

def vec_abs (a):
    return sum([x ** 2 for x in a]) ** 0.5

def vec_vprod(a,b):
    return a[Y] * b[Z] - a[Z] * b[Y], a[Z] * b[X] - a[X] * b[Z], a[X] * b[Y] - a[Y] * b[X]

def vec_sprod(a, b):
    return sum([x * y for x, y in zip(a, b)])

def vec_scale(a, b):
    return [b * x for x in a]


def get_position(advertisers, debug=False, callback=None):
    if len(advertisers) < 4:
        return None

    # get distances s according to the field strengths
    s = np.array([x.get_distance() for x in advertisers])
    # get starting position sx as offset next to the strongest advertiser
    sx = vec_sub(min(map(lambda a: (a.get_distance(), a.pos), advertisers))[1], (0,0,0.2))
    beacon_positions = np.array([x.pos for x in advertisers])
    beacon_variance  = np.array([x.get_distance_error() for x in advertisers])
    a = np.array([1] * len(advertisers))

    def error (x):
        # squared error = sum of [(setpoint - measurement)**2]
        # get distances to the beacons
        r=(((x-beacon_positions)**2).sum(axis=1))**0.5
        # get all [(meas distance - calc distance)**2]
        errors = (s-r)**2
        # cut off high errors to decrease the influence of values that are far away
        errors = np.fmin(errors, 0*r+1**2)
        # get scaling factors (beacon_variance, replace None by some low value)
        scaling = []
        for bv in beacon_variance:
            if bv is None: scaling.append(1e-20) # there is no variance
            elif bv == 0:  scaling.append(1e-20) # 0 cannot be inverted
            else:          scaling.append(1/bv)  # 
        scaling = np.array(scaling)
        # not working because python doesn't lazy evaluate arrays
#        scaling = map(lambda s: [1e-6,1/s][s in [float, int, long]], beacon_variance)
        # return sum of all [(meas distance - calc distance)**2]
        return (errors).sum()
        return (errors * scaling).sum()

    # fmin works since it doesn't need the brocken error gradient funktion
    return scipy.optimize.fmin(error, sx, ftol=1e-2, disp=debug, callback=callback)



class Advertiser ():
    def __init__(self, mac, pos, tx_power):
        self.mac = mac
        self.pos = pos
        self.tx_power = tx_power
        self.rssi = {}

    def clean_measurements(self, old=2):
        now = time.time()
        tmp = {}
        for ts in self.rssi:
            if now - ts < old:
                tmp[ts] = self.rssi[ts]
        self.rssi = tmp

    def add_measurement(self, rssi):
        self.clean_measurements()
        self.rssi[time.time()] = rssi

    def get_medium_rssi(self):
        self.clean_measurements()
        if len(self.rssi) != 0:
            return sorted(self.rssi.values())[len(self.rssi) // 2]
        else:
            return None

    def get_middled_rssi(self):
        self.clean_measurements()
        if len(self.rssi) != 0:
            return functools.reduce(lambda x, y: x + y, self.rssi.values()) / float(len(self.rssi))
        else:
            return None

    def get_rssi_error(self):
        if len(self.rssi) >= 2:
            m = self.get_middled_rssi()
            return functools.reduce(lambda x, y: x + y, [(v - m) ** 2 for v in self.rssi.values()]) / (len(self.rssi) - 1)
        else:
            return None

    def get_distance(self):
        if self.get_middled_rssi() != None:
            ratio_power = self.tx_power - self.get_middled_rssi()
            return (10 ** (ratio_power / 10.)) ** 0.5
        else:
            return None

    def get_distance_error(self):
        try: return C * self.get_distance() * self.get_rssi_error()
        except: return None

    def __str__(self):
        try:
            return "%s: (%d/%0.2f|%0.2f|%d) %0.5f+-%0.3f" % (self.mac, self.get_medium_rssi(),
                                                                self.get_middled_rssi(),
                                                                self.get_rssi_error(),
                                                                self.tx_power,
                                                                self.get_distance(),
                                                                self.get_distance_error())
        except TypeError:
            return self.mac + ": - "

    def __gt__(self, other):
        self.get_middled_rssi() > other.get_middled_rssi()

    def __lt__(self, other):
        self.get_middled_rssi() < other.get_middled_rssi()

    def __eq__(self, other):
        self.get_middled_rssi() == other.get_middled_rssi()

if __name__ == "__main__":
    s = serial.Serial(sys.argv[1], baudrate=25600, rtscts=True)
    s.timeout = 0.1
    s.readall()
    s.write(b"\x00\x01\x06\x02\x01")

    devices = {}

    with open("%d.log" % (time.time()), "w") as fd:
        try:
            while True:
                class_id, cmd_id, payload = read_packet(s)
                if class_id == 0x06 and cmd_id == 0x00 and len(payload) == 0x29:
                    rssi = payload[0] - 0xFF
                    mac = ":".join(map(lambda c: "%02x" % (c,), payload[7:1:-1]))
                    tx_power = payload[-1] - 0xFF
    #                print ("a:", mac, repr(devices))
                    if mac not in devices.keys():
                        if mac in ADVERTISERS:
                            devices[mac] = Advertiser(mac, ADVERTISERS[mac], tx_power)
                        else:
                            devices[mac] = Advertiser(mac, None, tx_power)
                    devices[mac].add_measurement(rssi)
                pos = get_position(list(filter(lambda x: x.pos != None and x.get_distance() != None, devices.values())))
                print("\x1b[2J")
                for i in sorted(devices):
                    print(devices[i])
                print (repr(pos))


        except KeyboardInterrupt:
            s.write(b"\x00\x00\x06\x04")