[go: up one dir, main page]

File: util.py

package info (click to toggle)
txtorcon 0.18.0-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 1,148 kB
  • ctags: 1,531
  • sloc: python: 10,403; makefile: 211
file content (318 lines) | stat: -rw-r--r-- 9,356 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import print_function
from __future__ import with_statement

import glob
import os
import hmac
import hashlib
import shutil
import socket
import subprocess
import ipaddress
import struct
import re

from twisted.internet import defer
from twisted.internet.interfaces import IProtocolFactory

from twisted.internet.endpoints import serverFromString

from zope.interface import implementer

try:
    import GeoIP as _GeoIP
    GeoIP = _GeoIP
except ImportError:
    GeoIP = None

city = None
country = None
asn = None

# XXX probably better to depend on and use "six" for py2/3 stuff?
try:
    unicode
except NameError:
    py3k = True
    basestring = str
else:
    py3k = False
    basestring = basestring


def create_geoip(fname):
    # It's more "pythonic" to just wait for the exception,
    # but GeoIP prints out "Can't open..." messages for you,
    # which isn't desired here
    if not os.path.isfile(fname):
        raise IOError("Can't find %s" % fname)

    if GeoIP is None:
        return None

    # just letting any errors make it out
    return GeoIP.open(fname, GeoIP.GEOIP_STANDARD)


def maybe_create_db(path):
    try:
        return create_geoip(path)
    except IOError:
        return None


city = maybe_create_db("/usr/share/GeoIP/GeoLiteCity.dat")
asn = maybe_create_db("/usr/share/GeoIP/GeoIPASNum.dat")
country = maybe_create_db("/usr/share/GeoIP/GeoIP.dat")


def is_executable(path):
    """Checks if the given path points to an existing, executable file"""
    return os.path.isfile(path) and os.access(path, os.X_OK)


def find_tor_binary(globs=('/usr/sbin/', '/usr/bin/',
                           '/Applications/TorBrowser_*.app/Contents/MacOS/'),
                    system_tor=True):
    """
    Tries to find the tor executable using the shell first or in in the
    paths whose glob-patterns is in the given 'globs'-tuple.

    :param globs:
        A tuple of shell-style globs of directories to use to find tor
        (TODO consider making that globs to actual tor binary?)

    :param system_tor:
        This controls whether bash is used to seach for 'tor' or
        not. If False, we skip that check and use only the 'globs'
        tuple.
    """

    # Try to find the tor executable using the shell
    if system_tor:
        try:
            proc = subprocess.Popen(
                ('which tor'),
                stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                shell=True
            )
        except OSError:
            pass
        else:
            stdout, _ = proc.communicate()
            if proc.poll() == 0 and stdout != '':
                return stdout.strip()

    # the shell may not provide type and tor is usually not on PATH when using
    # the browser-bundle. Look in specific places
    for pattern in globs:
        for path in glob.glob(pattern):
            torbin = os.path.join(path, 'tor')
            if is_executable(torbin):
                return torbin
    return None


def maybe_ip_addr(addr):
    """
    Tries to return an IPAddress, otherwise returns a string.

    TODO consider explicitly checking for .exit or .onion at the end?
    """

    try:
        return ipaddress.ip_address(addr)
    except ValueError:
            pass
    return str(addr)


def find_keywords(args, key_filter=lambda x: not x.startswith("$")):
    """
    This splits up strings like name=value, foo=bar into a dict. Does NOT deal
    with quotes in value (e.g. key="value with space" will not work

    By default, note that it takes OUT any key which starts with $ (i.e. a
    single dollar sign) since for many use-cases the way Tor encodes nodes
    with "$hash=name" looks like a keyword argument (but it isn't). If you
    don't want this, override the "key_filter" argument to this method.

    :param args: a list of strings, each with one key=value pair

    :return:
        a dict of key->value (both strings) of all name=value type
        keywords found in args.
    """
    filtered = [x for x in args if '=' in x and key_filter(x.split('=')[0])]
    return dict(x.split('=', 1) for x in filtered)


def delete_file_or_tree(*args):
    """
    For every path in args, try to delete it as a file or a directory
    tree. Ignores deletion errors.
    """

    for f in args:
        try:
            os.unlink(f)
        except OSError:
            shutil.rmtree(f, ignore_errors=True)


def ip_from_int(ip):
        """ Convert long int back to dotted quad string """
        return socket.inet_ntoa(struct.pack('>I', ip))


def process_from_address(addr, port, torstate=None):
    """
    Determines the PID from the address/port provided by using lsof
    and returns it as an int (or None if it couldn't be
    determined). In the special case the addr is '(Tor_internal)' then
    the PID of the Tor process (as gotten from the torstate object) is
    returned (or 0 if unavailable, e.g. a Tor which doesn't implement
    'GETINFO process/pid'). In this case if no TorState instance is
    given, None is returned.
    """

    if addr is None:
        return None

    if "(tor_internal)" == str(addr).lower():
        if torstate is None:
            return None
        return int(torstate.tor_pid)

    proc = subprocess.Popen(['lsof', '-i', '4tcp@%s:%s' % (addr, port)],
                            stdout=subprocess.PIPE)
    (stdout, stderr) = proc.communicate()
    lines = stdout.split('\n')
    if len(lines) > 1:
        return int(lines[1].split()[1])


def hmac_sha256(key, msg):
    """
    Adapted from rransom's tor-utils git repository. Returns the
    digest (binary) of an HMAC with SHA256 over msg with key.
    """

    return hmac.new(key, msg, hashlib.sha256).digest()


CRYPTOVARIABLE_EQUALITY_COMPARISON_NONCE = os.urandom(32)


def compare_via_hash(x, y):
    """
    Taken from rransom's tor-utils git repository, to compare two
    hashes in something resembling constant time (or at least, not
    leaking timing info?)
    """
    return (hmac_sha256(CRYPTOVARIABLE_EQUALITY_COMPARISON_NONCE, x) ==
            hmac_sha256(CRYPTOVARIABLE_EQUALITY_COMPARISON_NONCE, y))


class NetLocation:
    """
    Represents the location of an IP address, either city or country
    level resolution depending on what GeoIP database was loaded. If
    the ASN database is available you get that also.
    """

    def __init__(self, ipaddr):
        "ipaddr should be a dotted-quad"
        self.ip = ipaddr
        self.latlng = (None, None)
        self.countrycode = None
        self.city = None
        self.asn = None

        if self.ip is None or self.ip == 'unknown':
            return

        if city:
            try:
                r = city.record_by_addr(self.ip)
            except:
                r = None
            if r is not None:
                self.countrycode = r['country_code']
                self.latlng = (r['latitude'], r['longitude'])
                try:
                    self.city = (r['city'], r['region_code'])
                except KeyError:
                    self.city = (r['city'], r['region_name'])

        elif country:
            self.countrycode = country.country_code_by_addr(ipaddr)

        else:
            self.countrycode = ''

        if asn:
            try:
                self.asn = asn.org_by_addr(self.ip)
            except:
                self.asn = None


@implementer(IProtocolFactory)
class NoOpProtocolFactory:
    """
    This is an IProtocolFactory that does nothing. Used for testing,
    and for :method:`available_tcp_port`
    """
    def noop(self, *args, **kw):
        pass
    buildProtocol = noop
    doStart = noop
    doStop = noop


@defer.inlineCallbacks
def available_tcp_port(reactor):
    """
    Returns a Deferred firing an available TCP port on localhost.
    It does so by listening on port 0; then stopListening and fires the
    assigned port number.
    """

    endpoint = serverFromString(reactor, 'tcp:0:interface=127.0.0.1')
    port = yield endpoint.listen(NoOpProtocolFactory())
    address = port.getHost()
    yield port.stopListening()
    defer.returnValue(address.port)


def unescape_quoted_string(string):
    r'''
    This function implementes the recommended functionality described in the
    tor control-spec to be compatible with older tor versions:

      * Read \\n \\t \\r and \\0 ... \\377 as C escapes.
      * Treat a backslash followed by any other character as that character.

    Except the legacy support for the escape sequences above this function
    implements parsing of QuotedString using qcontent from

    QuotedString = DQUOTE *qcontent DQUOTE

    :param string: The escaped quoted string.
    :returns: The unescaped string.
    :raises ValueError: If the string is in a invalid form
                        (e.g. a single backslash)
    '''
    match = re.match(r'''^"((?:[^"\\]|\\.)*)"$''', string)
    if not match:
        raise ValueError("Invalid quoted string", string)
    string = match.group(1)
    # remove backslash before all characters which should not be
    # handeled as escape codes by string.decode('string-escape').
    # This is needed so e.g. '\x00' is not unescaped as '\0'
    string = re.sub(r'((?:^|[^\\])(?:\\\\)*)\\([^ntr0-7\\])', r'\1\2', string)
    return string.decode('string-escape')