Torus_Topo

Create a torus network topology.

This is a series of connected rings. Include test code to generate route maps and test connectivity.

OrbitData dataclass

Records key orbital information

Source code in emulation/torus_topo.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@dataclass
class OrbitData:
    '''
    Records key orbital information
    '''

    right_ascension: float  # degrees
    inclination: float  # degrees
    mean_anomaly: float  # degrees
    cat_num: int = 0

    cat_num_count: ClassVar[int] = 1

    def assign_cat_num(self) -> None:
        self.cat_num = OrbitData.cat_num_count
        OrbitData.cat_num_count += 1

    @staticmethod
    def tle_check_sum(line: str) -> str:
        val = 0
        for i in range(len(line)):
            if line[i] == "-":
                val += 1
            elif line[i].isdigit():
                val += int(line[i])
        return str(val % 10)

    def tle_format(self) -> tuple[str,str]:
        time_tuple = datetime.datetime.now().timetuple()
        year = time_tuple.tm_year % 1000 % 100
        day = time_tuple.tm_yday

        l1 = LINE1.format(self.cat_num, year, day, 342)
        l2 = LINE2.format(
            self.cat_num, self.inclination, self.right_ascension, self.mean_anomaly
        )
        l1 = l1 + OrbitData.tle_check_sum(l1)
        l2 = l2 + OrbitData.tle_check_sum(l2)
        return l1, l2

create_network(num_rings=NUM_RINGS, num_ring_nodes=NUM_RING_NODES, ground_stations=True)

Create a torus network of the given size annotated with orbital information.

Source code in emulation/torus_topo.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def create_network(num_rings: int =NUM_RINGS, num_ring_nodes: int =NUM_RING_NODES, ground_stations: bool = True) -> networkx.Graph:
    '''
    Create a torus network of the given size annotated with orbital information.
    '''
    graph: networkx.Graph = networkx.Graph()
    graph.graph["rings"] = num_rings
    graph.graph["ring_nodes"] = num_ring_nodes
    graph.graph["ring_list"] = []
    graph.graph["inclination"] = 53.9
    prev_ring_num = None
    for ring_num in range(num_rings):
        create_ring(graph, ring_num, num_ring_nodes)
        if prev_ring_num is not None:
            connect_rings(graph, prev_ring_num, ring_num, num_ring_nodes)
        prev_ring_num = ring_num
    if prev_ring_num is not None:
        connect_rings(graph, prev_ring_num, 0, num_ring_nodes)

    if ground_stations:
        add_ground_stations(graph)

    # Set all edges to up
    for edge_name, edge in graph.edges.items():
        edge["up"] = True
    return graph

Mark the inter-ring links down for the specified node numbers on all rings to prevent use during a path trace. This causes many inter-ring links to be down to test the routing functions.

Source code in emulation/torus_topo.py
204
205
206
207
208
209
210
211
212
213
214
215
216
def down_inter_ring_links(graph: networkx.Graph, node_num_list: list[int], num_rings=NUM_RINGS):
    '''
    Mark the inter-ring links down for the specified node numbers on all rings 
    to prevent use during a path trace. This causes many inter-ring links to be down to 
    test the routing functions.
    '''
    # Set the specified links to down
    for node_num in node_num_list:
        for ring_num in range(num_rings):
            node_name = get_node_name(ring_num, node_num)
            for neighbor_name in graph.adj[node_name]:
                if graph[node_name][neighbor_name]["inter_ring"]:
                    graph[node_name][neighbor_name]["up"] = False

generate_route_table(graph, node_name)

Breadth first search to generate routes fromthe start node to all other nodes. Routing table provides a next hop and a path length for all possible destinations.

{ "dest node" : ( path_len, "next hop" )}

Source code in emulation/torus_topo.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def generate_route_table(graph: networkx.Graph, node_name: str) -> dict[str,tuple[int,str]]:
    '''
    Breadth first search to generate routes fromthe  start node to all other nodes.
    Routing table provides a next hop and a path length for all possible destinations.

    { "dest node" : ( path_len, "next hop" )}
    '''

    routes = {}  # Dest: (hops, next hop node)
    for name, node in graph.nodes.items():
        node["visited"] = False

    # Queue to nodes to visit
    node_list = []
    # Mark the start node as visited
    graph.nodes[node_name]["visited"] = True

    def visit_node(graph: networkx.Graph, next_hop: str, path_len: int, visit_node_name: str) -> None:
        '''
        Visit a node by adding all neighbors to the visit queue
        '''
        # Neighbors already visted are added to the queue, we skip them here
        if graph.nodes[visit_node_name]["visited"]:
            return
        graph.nodes[visit_node_name]["visited"] = True

        # This node is reachable from the start node via the given 
        # next hop from the start node
        routes[visit_node_name] = (path_len, next_hop)

        # Enqueue is reachable neighbor for a future visit
        for neighbor_node_name in graph.adj[visit_node_name]:
            if graph.edges[visit_node_name, neighbor_node_name]["up"]:
                node_list.append((path_len + 1, next_hop, neighbor_node_name))

    # Enqueue the neighbors of the start node for visiting
    for neighbor_node_name in graph.adj[node_name]:
        if graph.edges[node_name, neighbor_node_name]["up"]:
            node_list.append((1, neighbor_node_name, neighbor_node_name))

    # Visit all nodes until the queue is empty
    while len(node_list) > 0:
        path_len, next_hop, visit_node_name = node_list.pop(0)
        visit_node(graph, next_hop, path_len, visit_node_name)

    return routes

ground_stations(graph)

Return a list of all node names where the node is of type ground

Source code in emulation/torus_topo.py
48
49
50
51
52
53
54
55
56
57
def ground_stations(graph: networkx.Graph) -> list[str]:
    '''
    Return a list of all node names where the node is of type ground
    '''
    # Consider converting to using yield
    result = []
    for name in graph.nodes:
        if graph.nodes[name][TYPE] == TYPE_GROUND:
            result.append(name)
    return result

run_routing_test()

Make a graph and exercise path tracing

Source code in emulation/torus_topo.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def run_routing_test() -> bool:
    '''
    Make a graph and exercise path tracing
    '''
    graph: networkx.Graph = create_network()

    down_inter_ring_links(graph, [0, 1, 2, 3, 4, 5, 20, 21, 22, 23, 24, 25])

    print("Number nodes: %d" % graph.number_of_nodes())
    print("Number edges: %d" % graph.number_of_edges())
    print(graph.nodes)
    print(graph.edges)

    for node in satellites(graph):
        print(node)
        orbit = graph.nodes[node]["orbit"]
        print(orbit)
        l1, l2 = orbit.tle_format()
        print(l1)
        print(l2)
        print()

    routes = generate_route_table(graph, get_node_name(0, 0))
    for node, entry in routes.items():
        print("node: %s, next: %s, len: %d" % (node, entry[1][0], entry[0]))

    route_tables = {}
    for node_name in graph.nodes():
        print("generate routes %s" % node_name)
        route_tables[node_name] = generate_route_table(graph, node_name)
        print(f"len: {len(route_tables[node_name])}")

    result: bool = trace_path(get_node_name(0, 0), get_node_name(0, 1), route_tables)
    print()
    result = result and trace_path(get_node_name(0, 0), get_node_name(0, 2), route_tables)
    print()
    result = result and trace_path(get_node_name(0, 0), get_node_name(1, 0), route_tables)
    print()
    result = result and trace_path(get_node_name(0, 0), get_node_name(18, 26), route_tables)
    return result

run_small_test()

Make a graph

Source code in emulation/torus_topo.py
289
290
291
292
293
294
def run_small_test() -> bool:
    '''
    Make a graph
    '''
    graph: networkx.Graph = create_network()
    return True

satellites(graph)

Return a list of all node names where the node is of type satellite

Source code in emulation/torus_topo.py
60
61
62
63
64
65
66
67
68
69
def satellites(graph: networkx.Graph) -> list[str]:
    '''
    Return a list of all node names where the node is of type satellite
    '''
    # Consider converting to using yield
    result = []
    for name in graph.nodes:
        if graph.nodes[name][TYPE] == TYPE_SAT:
            result.append(name)
    return result

trace_path(start_node_name, target_node_name, route_tables)

Follow the routing tables to trace a path between the start and target node route_tables is a dictionary of routes for each source node

Source code in emulation/torus_topo.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def trace_path(start_node_name: str, target_node_name: str, route_tables: dict[str,dict[str,tuple[int,str]]]) -> bool:
    '''
    Follow the routing tables to trace a path between the start and target node
    route_tables is a dictionary of routes for each source node
    '''
    unreachable_count: int = 0
    print("trace node %s to %s" % (start_node_name, target_node_name))
    current_node_name: str | None = start_node_name

    # Follow path until we reach the target or it is unreachable
    while current_node_name is not None and current_node_name != target_node_name:
        if route_tables[current_node_name].get(target_node_name) is None:
            current_node_name = None
            print("unreachable")
        else:
            entry = route_tables[current_node_name][target_node_name]
            next_hop_name = entry[1]
            print(next_hop_name)
            current_node_name = next_hop_name
    return current_node_name is not None