Nach langer Zeit des Suchens, wage ich doch mal eine Forumsanfrage:
Mein Knackpunkt:
Per GPS bekomme ich eine laufende Position.
Es gibt einen Track, also Wegpunkte für eine vorgegebene Strecke (auf dem Wasser)
Wie bekomme ich heraus, wo (zwischen welchen Wegpunkten) ich mich befinde bzw. ich benötige ein Fortkommen auf dem Track und dafür das "Erreichen des nächsten Wegpunktes" ... oder anders gefragt: wie entscheide ich, wann ich von einem Abschnitt [...] b-c in den nächsten c-d springe?
Da bei spielt mit ein:
- Erste GPS Position kann auch "mitten drin" erfolgen.
- Fahrtrichtung ist nicht entscheidend führ den nächsten Wegpunkt. (Anfang und Ende der Wegpunkte wird manuell eingestellt)
- Auf dem Wasser kann durchaus auch abgekürzt werden, also man fährt nicht direkt auf der Linie zwischen zwei Wegpunkten, und fährt auch mal mit ein paar Meilen Abstand am nächsten Wegpunkt vorbei. (XTE)
Code: Alles auswählen
import logging
import math
from collections import deque
from os import path
from PyQt5 import QtPositioning
from PyQt5.QtCore import QObject, pyqtSlot, pyqtSignal
from xml.etree import ElementTree as et
from . import navigate as nav
from .routePoint import RoutePoint
from .search import Graph
from .logbook import get_log_book
from ..utils import RADIUS_EARTH as R
logger = logging.getLogger("pygps")
logbook = get_log_book()
class EndOfRoute(Exception):
def __init__(self):
super().__init__(self, "End of route / final waypoint reached.")
class Route(QObject):
renew = pyqtSignal(dict)
stopped = pyqtSignal()
started = pyqtSignal()
def __init__(self):
super().__init__()
self.route_point_collection = deque() # TODO check is Linkedlist is more usefull
self.init_parameters()
def init_parameters(self):
self.r_distance = 0.0
self.r_name = None
self.r_departure = None
self.r_destination = None
#self.s_parameter = None TODO Later this could be used to hand over a class with vessels parameter from configuration file.
self.active = False
self.index_next_rpt = 0
self.curr_fix = QtPositioning.QGeoPositionInfo()#(QtPositioning.QGeoCoordinate(0,0))
def load_from_gpx(self, gpx):
[...]
def load_from_bank(self, route_id):
"""Load route from sqlite database with given route_id.
"""
pass
#self.find_next_rtepoint() Uncomment when function is implemented.
def add_rtepoint(self, rpt):
assert isinstance(rpt, RoutePoint)
self.route_point_collection.append(rpt)
def insert_rtepoint(self, rpt, index=-1):
assert isinstance(rpt, RoutePoint)
self.route_point_collection.insert(index, rpt)
def delete_rtepoint(self, index=-1):
pt = self.route_point_collection[index]
self.route_point_collection.remove(pt)
def arrived_rtepoint(self):
# Follow checks only if speed has minimum velocity.
if self.curr_fix.attribute(QtPositioning.QGeoPositionInfo.GroundSpeed) < 1:
return False
rtepoint = self.route_point_collection[self.index_next_rpt]
# Calculation for search.djikstra TODO optimize
# ->
edges = []
edges.append((0 , 1, nav.RhumbLine.distance(self.curr_fix.coordinate().latitude(), self.curr_fix.coordinate().longitude(),
rtepoint.latitude, rtepoint.longitude)))
try:
edges.append((0 , 2, nav.RhumbLine.distance(self.curr_fix.coordinate().latitude(), self.curr_fix.coordinate().longitude(),
self.route_point_collection[self.index_next_rpt+1].latitude, self.route_point_collection[self.index_next_rpt+1].longitude)))
except IndexError as e:
pass
for i in range(1, self.count()):
try:
edges.append((i , i+1, self.route_point_collection[i].p_leg_dist))# edges index = route_point_collection index + 1
except IndexError:
pass
graph = Graph(edges)
graph.dijkstra(0, self.count())
dij_index = graph.get_next_wpt
# <-
# bering: fix to next route point
bearing = nav.RhumbLine.bearing(self.curr_fix.coordinate().latitude(), self.curr_fix.coordinate().longitude(), rtepoint.latitude, rtepoint.longitude)
course = self.curr_fix.attribute(QtPositioning.QGeoPositionInfo.Direction)
# Bearing to route point more than 70° to each side.
dangle = bearing - course if bearing > course else course - bearing
if dangle > 180:
dangle -= 360
elif dangle < -180:
dangle += 360
try:
if rtepoint.circle.contains(self.curr_fix.coordinate()):
logger.watch("Arrived at waypoint '{}'".format(rtepoint.name))
logger.debug("Skipped by circle")
logbook.route(descr="Arrived at waypoint {}".format(rtepoint.name))
self.goto_next_routepoint()
#elif dij_index > self.index_next_rpt:
#logger.watch("Wheeled over waypoint '{}'".format(rtepoint.name))
#logger.debug("Skipped by Dijkstra")
#logbook.route(descr="Wheeled over waypoint {}".format(rtepoint.name))
#self.goto_next_routepoint()
elif (-100 > dangle or dangle > 100) and dij_index > self.index_next_rpt:
logger.watch("Wheeled over waypoint '{}'".format(rtepoint.name))
logger.debug("Skipped by Dijkstra and bearing")
logbook.route(descr="Wheeled over waypoint {}".format(rtepoint.name))
self.goto_next_routepoint()
except EndOfRoute:
logger.watch("Route completed")
logbook.route(descr="Route completed")
self.stop()
self.clear()
def _arrived_rtepoint(self): #TBD DEPRECATED
"""Handles arriving at next waypoint.
:return type: bool TODO wird gar nicht aufgerufen???
"""
ships_length = 149.0
ships_circle = QtPositioning.QGeoCircle(self.curr_fix.coordinate(), ships_length*4.0)
rtepoint = self.route_point_collection[self.index_next_rpt]
#print("in")
#bering = achterlicher als querab
bearing = self.curr_fix.coordinate().azimuthTo(rtepoint.q_geocoordinate())
course = self.curr_fix.attribute(QtPositioning.QGeoPositionInfo.Direction)
distance = self.curr_fix.coordinate().distanceTo(rtepoint.q_geocoordinate())
# Follow checks only if speed has minimum velocity.
if self.curr_fix.attribute(QtPositioning.QGeoPositionInfo.GroundSpeed) < 2:
return False
#print(rtepoint.rectangle.toString()) # HACK
dangle = bearing - course if bearing > course else course - bearing
if dangle > 110:
dangle -= 360
if dangle < -180:
dangle += 360
try:
if rtepoint.circle.contains(self.curr_fix.coordinate()):
logger.watch("Arrived at waypoint '{}'".format(rtepoint.name))
logbook.route(descr="Arrived at waypoint {}".format(rtepoint.name))
self.goto_next_routepoint()
logger.debug("Skipped by circle")
#elif self.index_next_rpt < self.count()-1:
#if self.route_point_collection[self.index_next_rpt+1].rectangle.contains(QtPositioning.QGeoRectangle(self.curr_fix.coordinate(),0.1,0.1)):
#logger.watch("Arrived at waypoint '{}'".format(rtepoint.name))
#logbook.route(descr="Arrived at waypoint {}".format(rtepoint.name))
#self.goto_next_routepoint()
#logger.debug("Skipped by rectangle WP 1")
#elif not self.route_point_collection[self.index_next_rpt].rectangle.contains(self.curr_fix.coordinate()):#TODO weiter and Klausel einführen als sicherheit
#logger.watch("Arrived at waypoint '{}'".format(rtepoint.name))
#logbook.route(descr="Arrived at waypoint {}".format(rtepoint.name))
#self.goto_next_routepoint()
#logger.debug("Skipped by not rectangle")
if (-100 > dangle or dangle > 100):
logger.watch("Arrived at waypoint '{}'".format(rtepoint.name))
self.goto_next_routepoint()
logbook.route(descr="Arrived at waypoint {}".format(rtepoint.name))
except EndOfRoute:
logger.watch("Route completed.")# TODO transfer logger.watch in loogbook
self.stop()
self.clear()
logbook.route(descr="Route completed")
return False
def get_xtd(self):
"""Calculate cross track error.
:return: distance in m. '-' if left, '+' if right of the track.
"""
if self.index_next_rpt == 0:
return 0
rtp_1 = self.route_point_collection[self.index_next_rpt-1]
rtp_2 = self.route_point_collection[self.index_next_rpt]
radius = R * 0.53995680345572 # Earth radius in naut miles.
if self.curr_fix.coordinate() == rtp_1.q_geocoordinate():
return 0
phi13 = nav.RhumbLine.distance(rtp_1.latitude, rtp_1.longitude, self.curr_fix.coordinate().latitude(), self.curr_fix.coordinate().longitude()) / radius
roh13 = math.radians(nav.RhumbLine.bearing(rtp_1.latitude, rtp_1.longitude, self.curr_fix.coordinate().latitude(), self.curr_fix.coordinate().longitude()))
roh12 = math.radians(nav.RhumbLine.bearing(rtp_1.latitude, rtp_1.longitude, rtp_2.latitude, rtp_2.longitude))
phixt = math.asin(math.sin(phi13) * math.sin(roh13 - roh12))
return phixt * radius
def update_route_data(self):
"""Activate the route.
Does all calculations which are necessary for sailing along the loaded route.
Emits ``renew`` signal with paramter data<dict>. Keys are: name, crsww, dist, total, bearing
"""
data = dict()
try:
rpt = self.route_point_collection[self.index_next_rpt]
self.arrived_rtepoint()
data = {"name":rpt.name}
data["crsww"] = rpt.p_leg_course
data["dist"] = nav.RhumbLine.distance(self.curr_fix.coordinate().latitude(), self.curr_fix.coordinate().longitude(), rpt.latitude, rpt.longitude)
data["bearing"] = nav.RhumbLine.bearing(self.curr_fix.coordinate().latitude(), self.curr_fix.coordinate().longitude(), rpt.latitude, rpt.longitude)
data["total"] = sum([rtp.p_leg_dist for rtp in list(self.route_point_collection)[self.index_next_rpt+1:]]) + data["dist"]
except EndOfRoute:# TODO der greift doch gar nicht ???
print("EndOfRoute execption:route.py:254")
data["name"] = ""
data["dist"] = data["bearing"] = data["total"] = data["crsww"] = 0
self.stop()
finally:
self.renew.emit(data)
def find_next_rtepoint(self):
"""Find the next route point to sail in the track.
"""
index = self.find_nearest_rtepoint()
nearest_rpt = self.route_point_collection[index]
if nearest_rpt.p_next:# BUG am Anfang ist der erste rpt immer next == True: Kurzer abstand + richtung???
self.index_next_rpt = index
else:
for ind, x in enumerate(self.route_point_collection):
if x.p_next:
self.index_next_rpt = ind
def find_nearest_rtepoint(self):
"""Find nearest route point.
Iterate over the route point list and return the route point with the shortest distance to the given position.
:return: route point index
"""
# dd = 10 as start value.
dd = 10
rtp_ind = 0
for i, rpt in enumerate(self.route_point_collection):
d = nav.RhumbLine.distance(self.curr_fix.coordinate().latitude(), self.curr_fix.coordinate().longitude(), rpt.latitude, rpt.longitude)
if d < dd:
dd = d
rtp_ind = i
return rtp_ind
def update_segment_distance(self):
"""Update the route segment lengths.
Storing each segment length in <destination> point.
Also, compute total route length by summing segment distances.
"""
for n in range(1, len(self.route_point_collection)):
pt1 = self.route_point_collection[n-1]
pt2 = self.route_point_collection[n]
# TODO LAter implement setup for RL or GC
dd = nav.RhumbLine.distance(pt1.latitude, pt1.longitude, pt2.latitude, pt2.longitude)
crs = nav.RhumbLine.bearing(pt1.latitude, pt1.longitude, pt2.latitude, pt2.longitude)
# Set distance and course to next waypoint.
pt2.p_leg_dist = dd
pt2.p_leg_course = crs
# Calculate total distance.
self.r_distance += dd
def reverse(self):
"""Reverse route.
"""
self.route_point_collection.reverse()
def start(self):
"""Start sailing.
"""
self.active = True
logger.info("Route '{}' is active.".format(self.r_name))
def stop(self):
"""Stopp sailing.
"""
self.active = False
self.stopped.emit()
logger.info("Routing '{}' is stopped.".format(self.r_name))
def clear(self):
logger.info("Route '{}' deleted.".format(self.r_name))
self.route_point_collection.clear()
self.init_parameters()
self.renew.emit({"name":None, "total":None, "crsww":None, "dist":None, "bearing":None})
@pyqtSlot(QtPositioning.QGeoPositionInfo)
def update_fix(self, fix):
"""Update gps_fix.
If the route is activated for sailing via Route.start(), the informations for any screens
will be calculated and updated every call.
"""
if self.active:
self.curr_fix = fix
self.update_route_data()
def goto_next_routepoint(self):
if self.index_next_rpt == self.count()-1:
raise EndOfRoute()#("End of route / final waypoint reached.")
elif self.index_next_rpt == 0:
self.started.emit()
rtepoint = self.route_point_collection[self.index_next_rpt]
rtepoint.p_next = False
rtepoint.p_last = True
if self.index_next_rpt > 0:
self.route_point_collection[self.index_next_rpt-1].p_last = False
self.index_next_rpt += 1
self.route_point_collection[self.index_next_rpt].p_next = True
logger.info("Skip to next route point.")
def goto_previous_routepoint(self):
if self.index_next_rpt == 0:
return
rtepoint = self.route_point_collection[self.index_next_rpt]
rtepoint.p_next = False
rtepoint.p_last = False
self.index_next_rpt -= 1
rtepoint = self.route_point_collection[self.index_next_rpt]
rtepoint.p_next = True
if self.index_next_rpt > 0:
self.route_point_collection[self.index_next_rpt-1].p_last = True
logger.info("Skip to previous route point.")
def count(self):
"""Get number of waypoints.
"""
return len(self.route_point_collection)
def __bool__(self):
return True if self.count() > 0 else False
Hoffe ich konnte mein Problem darlegen, eventuel kann da ja jemande den entscheidenden Hinweis geben.
Gruß Bob