__author__ = 'moritz' import serial import sys import time import itertools import math import functools import numpy as np import scipy.optimize ADVERTISERS = {"00:07:80:52:64:e6": [2.5, 5, 1], "00:07:80:7e:c3:68": [1, 0.5, 0.5], "00:07:80:7e:c3:7b": [5, 2, 1], "00:07:80:68:1c:9c": [0.5, .5, 2.2], "00:07:80:68:28:29": [6, 5, 1], "00:07:80:68:28:67": [3, 3, 0.5], "00:07:80:79:1f:f1": [6, .5, 1.8], "00:07:80:c0:ff:ee": [0.5, 5, 1 ] } X=0 Y=1 Z=2 C = - math.log(10) / 20 def read_packet(s): s.timeout = None header = s.read(4) s.timeout = 0.1 # print(repr(header[0])) packet_length = ((header[0] & 0x03) << 8) + header[1] class_id = header[2] cmd_id = header[3] payload = s.read(packet_length) return class_id, cmd_id, payload def vec_sum (a, b): return [x + y for x, y in zip(a, b)] def vec_sub (a, b): return [x - y for x, y in zip(a, b)] def vec_abs (a): return sum([x ** 2 for x in a]) ** 0.5 def vec_vprod(a,b): return a[Y] * b[Z] - a[Z] * b[Y], a[Z] * b[X] - a[X] * b[Z], a[X] * b[Y] - a[Y] * b[X] def vec_sprod(a, b): return sum([x * y for x, y in zip(a, b)]) def vec_scale(a, b): return [b * x for x in a] def get_position(advertisers, debug=False, callback=None): if len(advertisers) < 4: return None # get distances s according to the field strengths s = np.array([x.get_distance() for x in advertisers]) # get starting position sx as offset next to the strongest advertiser sx = vec_sub(min(map(lambda a: (a.get_distance(), a.pos), advertisers))[1], (0,0,0.2)) beacon_positions = np.array([x.pos for x in advertisers]) beacon_variance = np.array([x.get_distance_error() for x in advertisers]) a = np.array([1] * len(advertisers)) def error (x): # squared error = sum of [(setpoint - measurement)**2] # get distances to the beacons r=(((x-beacon_positions)**2).sum(axis=1))**0.5 # get all [(meas distance - calc distance)**2] errors = (s-r)**2 # cut off high errors to decrease the influence of values that are far away errors = np.fmin(errors, 0*r+1**2) # get scaling factors (beacon_variance, replace None by some low value) scaling = [] for bv in beacon_variance: if bv is None: scaling.append(1e-20) # there is no variance elif bv == 0: scaling.append(1e-20) # 0 cannot be inverted else: scaling.append(1/bv) # scaling = np.array(scaling) # not working because python doesn't lazy evaluate arrays # scaling = map(lambda s: [1e-6,1/s][s in [float, int, long]], beacon_variance) # return sum of all [(meas distance - calc distance)**2] return (errors).sum() return (errors * scaling).sum() # fmin works since it doesn't need the brocken error gradient funktion return scipy.optimize.fmin(error, sx, ftol=1e-2, disp=debug, callback=callback) class Advertiser (): def __init__(self, mac, pos, tx_power): self.mac = mac self.pos = pos self.tx_power = tx_power self.rssi = {} def clean_measurements(self, old=2): now = time.time() tmp = {} for ts in self.rssi: if now - ts < old: tmp[ts] = self.rssi[ts] self.rssi = tmp def add_measurement(self, rssi): self.clean_measurements() self.rssi[time.time()] = rssi def get_medium_rssi(self): self.clean_measurements() if len(self.rssi) != 0: return sorted(self.rssi.values())[len(self.rssi) // 2] else: return None def get_middled_rssi(self): self.clean_measurements() if len(self.rssi) != 0: return functools.reduce(lambda x, y: x + y, self.rssi.values()) / float(len(self.rssi)) else: return None def get_rssi_error(self): if len(self.rssi) >= 2: m = self.get_middled_rssi() return functools.reduce(lambda x, y: x + y, [(v - m) ** 2 for v in self.rssi.values()]) / (len(self.rssi) - 1) else: return None def get_distance(self): if self.get_middled_rssi() != None: ratio_power = self.tx_power - self.get_middled_rssi() return (10 ** (ratio_power / 10.)) ** 0.5 else: return None def get_distance_error(self): try: return C * self.get_distance() * self.get_rssi_error() except: return None def __str__(self): try: return "%s: (%d/%0.2f|%0.2f|%d) %0.5f+-%0.3f" % (self.mac, self.get_medium_rssi(), self.get_middled_rssi(), self.get_rssi_error(), self.tx_power, self.get_distance(), self.get_distance_error()) except TypeError: return self.mac + ": - " def __gt__(self, other): self.get_middled_rssi() > other.get_middled_rssi() def __lt__(self, other): self.get_middled_rssi() < other.get_middled_rssi() def __eq__(self, other): self.get_middled_rssi() == other.get_middled_rssi() if __name__ == "__main__": s = serial.Serial(sys.argv[1], baudrate=25600, rtscts=True) s.timeout = 0.1 s.readall() s.write(b"\x00\x01\x06\x02\x01") devices = {} with open("%d.log" % (time.time()), "w") as fd: try: while True: class_id, cmd_id, payload = read_packet(s) if class_id == 0x06 and cmd_id == 0x00 and len(payload) == 0x29: rssi = payload[0] - 0xFF mac = ":".join(map(lambda c: "%02x" % (c,), payload[7:1:-1])) tx_power = payload[-1] - 0xFF # print ("a:", mac, repr(devices)) if mac not in devices.keys(): if mac in ADVERTISERS: devices[mac] = Advertiser(mac, ADVERTISERS[mac], tx_power) else: devices[mac] = Advertiser(mac, None, tx_power) devices[mac].add_measurement(rssi) pos = get_position(list(filter(lambda x: x.pos != None and x.get_distance() != None, devices.values()))) print("\x1b[2J") for i in sorted(devices): print(devices[i]) print (repr(pos)) except KeyboardInterrupt: s.write(b"\x00\x00\x06\x04")