[go: up one dir, main page]

File: nftmgr.py

package info (click to toggle)
comitup 1.43-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,252 kB
  • sloc: python: 3,093; javascript: 1,261; sh: 95; makefile: 34
file content (117 lines) | stat: -rw-r--r-- 3,525 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# Copyright (c) 2017-2019 David Steele <dsteele@gmail.com>
#
# SPDX-License-Identifier: GPL-2.0-or-later
# License-Filename: LICENSE
#
# Copyright 2017 David Steele <steele@debian.org>
# This file is part of comitup
# Available under the terms of the GNU General Public License version 2
# or later
#

import logging
import subprocess
from typing import List, Optional

from comitup import modemgr, nm, routemgr

log: logging.Logger = logging.getLogger("comitup")


def start_hs_rules() -> None:
    # fmt: off
    start_cmds: List[str] = [
        "nft add table ip filter",
        "nft 'add chain ip filter COMITUP-OUT {{ type filter hook output priority 100 ; }}'",
        "nft 'add rule ip filter COMITUP-OUT icmp type destination-unreachable counter drop' ",
        "nft 'add rule ip filter COMITUP-OUT icmp code port-unreachable counter drop' ",
        "nft 'add rule ip filter COMITUP-OUT counter return'",
    ]
    # fmt: on

    ap_dev = nm.device_name(modemgr.get_ap_device())
    run_cmds(start_cmds, ap=ap_dev)


def stop_hs_rules() -> None:
    # fmt: off
    end_cmds: List[str] = [
        "nft flush chain ip filter COMITUP-OUT  >/dev/null 2>&1",
        "nft delete chain ip filter COMITUP-OUT > /dev/null 2>&1",
    ]
    # fmt: on

    run_cmds(end_cmds)


def start_router_rules() -> None:
    # fmt: off
    appliance_cmds: List[str] = [
        "nft add table ip nat",
        "nft 'add chain nat COMITUP-FWD {{ type nat hook postrouting priority 100 ; }}'",
        "nft 'add rule ip nat COMITUP-FWD oifname {link} counter masquerade'",
        "nft 'add rule ip nat COMITUP-FWD counter return'",
        "echo 1 > /proc/sys/net/ipv4/ip_forward",
    ]
    # fmt: on

    link_dev = nm.device_name(modemgr.get_link_device())
    run_cmds(appliance_cmds, link=link_dev)


def stop_router_rules() -> None:
    # fmt: off
    appliance_clear: List[str] = [
        "nft flush chain ip nat COMITUP-FWD > /dev/null 2>&1",
        "nft delete chain ip nat COMITUP-FWD > /dev/null 2>&1",
    ]
    # fmt: on

    run_cmds(appliance_clear)


def run_cmds(cmds: List[str], **vars) -> None:
    linkdev = nm.device_name(modemgr.get_link_device())
    vars["link"] = linkdev
    apdev = nm.device_name(modemgr.get_ap_device())
    vars["ap"] = apdev
    for cmd in cmds:
        runcmd = cmd.format(**vars)
        log.debug(f'nftmgr - running "{runcmd}"')
        subprocess.call(runcmd, shell=True)


def state_callback(state: str, action: str) -> None:
    if (state, action) == ("HOTSPOT", "start"):
        log.debug("nftmgr - Running nft commands for HOTSPOT")

        stop_hs_rules()
        start_hs_rules()

        if modemgr.get_mode() == modemgr.MULTI_MODE:
            stop_router_rules()

        log.debug("nftmgr - Done with nft commands for HOTSPOT")

    elif (state, action) == ("CONNECTED", "start"):
        log.debug("nftmgr - Running nft commands for CONNECTED")
        stop_hs_rules()

        if modemgr.get_mode() == modemgr.MULTI_MODE:
            stop_router_rules()
            start_router_rules()

            defaultdev: Optional[str] = routemgr.defroute_dev()
            apdev: str = modemgr.get_ap_device().Interface
            if defaultdev and defaultdev != apdev:
                run_cmds(
                    [
                        f"nft 'insert rule ip nat COMITUP-FWD oifname \"{defaultdev}\" counter masquerade'"
                    ],
                )

        log.debug("nftmgr - Done with nft commands for CONNECTED")


def init_nftmgr() -> None:
    pass