Migrate from Pipenv to Poetry

This commit is contained in:
Christophe Mehay 2020-05-09 15:28:19 +02:00
parent dddf9979db
commit caf94bc54d
14 changed files with 1109 additions and 504 deletions

View file

@ -1,374 +0,0 @@
#!/usr/bin/env python3
import argparse
import logging
import os
import sys
from base64 import b64decode
from json import dumps
from re import match
from jinja2 import Environment
from jinja2 import FileSystemLoader
from pyentrypoint import DockerLinks
from .Service import Service
from .Service import ServicesGroup
class Setup(object):
hidden_service_dir = "/var/lib/tor/hidden_service/"
torrc = '/etc/tor/torrc'
torrc_template = '/var/local/tor/torrc.tpl'
def _add_host(self, host):
if host not in self.setup:
self.setup[host] = {}
def _get_ports(self, host, ports):
self._add_host(host)
if 'ports' not in self.setup[host]:
self.setup[host]['ports'] = {host: []}
if host not in self.setup[host]['ports']:
self.setup[host]['ports'][host] = []
ports_l = [
[
int(v) if not v.startswith('unix:') else v
for v in sp.split(':', 1)
] for sp in ports.split(',')
]
for port in ports_l:
assert len(port) == 2
if port not in self.setup[host]['ports'][host]:
self.setup[host]['ports'][host].append(port)
def _get_key(self, host, key):
self._add_host(host)
assert len(key) > 800
self.setup[host]['key'] = key
def _load_keys_in_services(self):
for service in self.services:
service.load_key()
def _get_service(self, host, service):
self._add_host(host)
self.setup[host]['service'] = service
def find_group_by_service(self, service):
for group in self.services:
if service in group.services:
return group
def find_group_by_name(self, name):
for group in self.services:
if name == group.name:
return group
def find_service_by_host(self, host):
for group in self.services:
service = group.get_service_by_host(host)
if service:
return service
def add_empty_group(self, name, version=None):
if self.find_group_by_name(name):
raise Exception('Group {name} already exists'.format(name=name))
group = ServicesGroup(name=name, version=version)
self.services.append(group)
return group
def add_new_service(self,
host,
name=None,
ports=None,
key=None):
group = self.find_group_by_name(name)
if group:
service = group.get_service_by_host(host)
else:
service = self.find_service_by_host(host)
if not service:
service = Service(host=host)
if not group:
group = ServicesGroup(
service=service,
name=name,
hidden_service_dir=self.hidden_service_dir,
)
else:
group.add_service(service)
if group not in self.services:
self.services.append(group)
elif group and service not in group.services:
group.add_service(service)
else:
self.find_group_by_service(service)
if key:
group.add_key(key)
if ports:
service.add_ports(ports)
return service
def _set_service_names(self):
'Create groups for services, should be run first'
reg = r'([A-Z0-9]*)_SERVICE_NAME'
for key, val in os.environ.items():
m = match(reg, key)
if m:
self.add_new_service(host=m.groups()[0].lower(), name=val)
def _set_ports(self, host, ports):
self.add_new_service(host=host, ports=ports)
def _set_key(self, host, key):
self.add_new_service(host=host, key=key.encode())
def _setup_from_env(self, match_map):
for reg, call in match_map:
for key, val in os.environ.items():
m = match(reg, key)
if m:
call(m.groups()[0].lower(), val)
def _setup_keys_and_ports_from_env(self):
self._setup_from_env(
(
(r'([A-Z0-9]+)_PORTS', self._set_ports),
(r'([A-Z0-9]+)_KEY', self._set_key),
)
)
def get_or_create_empty_group(self, name, version=None):
group = self.find_group_by_name(name)
if group:
if version:
group.set_version(version)
return group
return self.add_empty_group(name, version)
def _set_group_version(self, name, version):
'Setup groups with version'
group = self.get_or_create_empty_group(name, version=version)
group.set_version(version)
def _set_group_key(self, name, key):
'Set key for service group'
group = self.get_or_create_empty_group(name)
if group.version == 3:
group.add_key(b64decode(key))
else:
group.add_key(key)
def _set_group_hosts(self, name, hosts):
'Set services for service groups'
self.get_or_create_empty_group(name)
for host_map in hosts.split(','):
host_map = host_map.strip()
port_from, host, port_dest = host_map.split(':', 2)
if host == 'unix' and port_dest.startswith('/'):
self.add_new_service(host=name, name=name, ports=host_map)
else:
ports = '{frm}:{dst}'.format(frm=port_from, dst=port_dest)
self.add_new_service(host=host, name=name, ports=ports)
def _setup_services_from_env(self):
self._setup_from_env(
(
(r'([A-Z0-9]+)_TOR_SERVICE_VERSION', self._set_group_version),
(r'([A-Z0-9]+)_TOR_SERVICE_KEY', self._set_group_key),
(r'([A-Z0-9]+)_TOR_SERVICE_HOSTS', self._set_group_hosts),
)
)
def _get_setup_from_env(self):
self._set_service_names()
self._setup_keys_and_ports_from_env()
self._setup_services_from_env()
def _get_setup_from_links(self):
containers = DockerLinks().to_containers()
if not containers:
return
for container in containers:
host = container.names[0]
self.add_new_service(host=host)
for link in container.links:
if link.protocol != 'tcp':
continue
port_map = os.environ.get('PORT_MAP')
self._set_ports(host, '{exposed}:{internal}'.format(
exposed=port_map or link.port,
internal=link.port,
))
def apply_conf(self):
self._write_keys()
self._write_torrc()
def _write_keys(self):
for service in self.services:
service.write_key()
def _write_torrc(self):
env = Environment(loader=FileSystemLoader('/'))
temp = env.get_template(self.torrc_template)
with open(self.torrc, mode='w') as f:
f.write(temp.render(services=self.services,
env=os.environ,
type=type,
int=int))
def setup_hosts(self):
self.setup = {}
self._get_setup_from_env()
self._get_setup_from_links()
self._load_keys_in_services()
self.check_services()
self.apply_conf()
def check_services(self):
for group in self.services:
for service in group.services:
if not service.ports:
raise Exception(
'Service {name} has not ports set'.format(
name=service.host
)
)
if len(group.services) > 1 and [
True for p in service.ports if p.is_socket
]:
raise Exception(
'Cannot use socket and ports '
'in the same service'.format(
name=service.host
)
)
if len(set(dict(group)['urls'])) != len(dict(group)['urls']):
raise Exception(
'Same port for multiple services in {name} group'.format(
name=group.name
)
)
class Onions(Setup):
"""Onions"""
def __init__(self):
self.services = []
if 'HIDDEN_SERVICE_DIR' in os.environ:
self.hidden_service_dir = os.environ['HIDDEN_SERVICE_DIR']
def torrc_parser(self):
self.torrc_dict = {}
def parse_dir(line):
_, path = line.split()
group_name = os.path.basename(path)
self.torrc_dict[group_name] = {
'services': [],
}
return group_name
def parse_port(line, name):
_, port_from, dest = line.split()
service_host, port = dest.split(':')
ports_str = '{port_from}:{dest}'
ports_param = ports_str.format(port_from=port_from,
dest=port)
if port.startswith('/'):
service_host = name
ports_param = ports_str.format(port_from=port_from,
dest=dest)
self.torrc_dict[name]['services'].append({
'host': service_host,
'ports': ports_param,
})
def parse_version(line, name):
_, version = line.split()
self.torrc_dict[name]['version'] = int(version)
def setup_services():
for name, setup in self.torrc_dict.items():
version = setup.get('version', 2)
group = (self.find_group_by_name(name)
or self.add_empty_group(name, version=version))
for service_dict in setup.get('services', []):
host = service_dict['host']
service = (group.get_service_by_host(host)
or Service(host))
service.add_ports(service_dict['ports'])
if service not in group.services:
group.add_service(service)
self._load_keys_in_services()
if not os.path.exists(self.torrc):
return
try:
with open(self.torrc, 'r') as f:
for line in f.readlines():
if line.startswith('HiddenServiceDir'):
name = parse_dir(line)
if line.startswith('HiddenServicePort'):
parse_port(line, name)
if line.startswith('HiddenServiceVersion'):
parse_version(line, name)
except BaseException:
raise Exception(
'Fail to parse torrc file. Please check the file'
)
setup_services()
def __str__(self):
if not self.services:
return 'No onion site'
return '\n'.join([str(service) for service in self.services])
def to_json(self):
service_lst = [dict(service) for service in self.services]
return dumps({
service['name']: service['urls'] for service in service_lst
})
def main():
logging.basicConfig()
parser = argparse.ArgumentParser(description='Display onion sites',
prog='onions')
parser.add_argument('--json', dest='json', action='store_true',
help='serialize to json')
parser.add_argument('--setup-hosts', dest='setup', action='store_true',
help='Setup hosts')
args = parser.parse_args()
logging.getLogger().setLevel(logging.WARNING)
try:
onions = Onions()
if args.setup:
onions.setup_hosts()
else:
onions.torrc_parser()
except BaseException as e:
logging.exception(e)
error_msg = str(e)
else:
error_msg = None
if args.json:
if error_msg:
print(dumps({'error': error_msg}))
sys.exit(1)
print(onions.to_json())
else:
if error_msg:
logging.error(error_msg)
sys.exit(1)
print(onions)
if __name__ == '__main__':
main()

View file

@ -1,195 +0,0 @@
'This class define a service link'
import base64
import binascii
import logging
import os
import pathlib
import re
from pytor import OnionV2
from pytor import OnionV3
from pytor.onion import EmptyDirException
class ServicesGroup(object):
name = None
version = None
imported_key = False
_default_version = 2
_onion = None
_hidden_service_dir = "/var/lib/tor/hidden_service/"
def __init__(self,
name=None,
service=None,
version=None,
hidden_service_dir=None):
name_regex = r'^[a-zA-Z0-9-_]+$'
self.onion_map = {
2: OnionV2,
3: OnionV3,
}
if not name and not service:
raise Exception(
'Init service group with a name or service at least'
)
self.services = []
self.name = name or service.host
if hidden_service_dir:
self._hidden_service_dir = hidden_service_dir
if not re.match(name_regex, self.name):
raise Exception(
'Group {name} has invalid name'.format(name=self.name)
)
if service:
self.add_service(service)
self.set_version(version or self._default_version)
self.gen_key()
def set_version(self, version):
version = int(version)
if version not in self.onion_map:
raise Exception(
'Url version {version} is not supported'.format(version)
)
self.version = version
self._onion = self.onion_map[version]()
@property
def hidden_service_dir(self):
return os.path.join(self._hidden_service_dir, self.name)
def add_service(self, service):
if service not in self.services:
if self.get_service_by_host(service.host):
raise Exception('Duplicate service name')
self.services.append(service)
def get_service_by_host(self, host):
for service in self.services:
if host == service.host:
return service
def add_key(self, key):
if self.imported_key:
logging.warning('Secret key already set, overriding')
# Try to decode key from base64 encoding
# import the raw data if the input cannot be decoded as base64
try:
key = base64.b64decode(key)
except binascii.Error:
pass
if isinstance(key, str):
key = key.encode('ascii')
self._onion.set_private_key(key)
self.imported_key = True
def __iter__(self):
yield 'name', self.name
yield 'onion', self.onion_url
yield 'urls', list(self.urls)
yield 'version', self.version
def __str__(self):
return '{name}: {urls}'.format(name=self.name,
urls=', '.join(self.urls))
@property
def onion_url(self):
return self._onion.onion_hostname
@property
def urls(self):
for service in self.services:
for ports in service.ports:
yield '{onion}:{port}'.format(onion=self.onion_url,
port=ports.port_from)
def write_key(self, hidden_service_dir=None):
'Write key on disk and set tor service'
if not hidden_service_dir:
hidden_service_dir = self.hidden_service_dir
if not os.path.isdir(hidden_service_dir):
pathlib.Path(hidden_service_dir).mkdir(parents=True)
self._onion.write_hidden_service(hidden_service_dir, force=True)
def _load_key(self, key_file):
with open(key_file, 'rb') as f:
self._onion.set_private_key_from_file(f)
def load_key(self, override=False):
if self.imported_key and not override:
return
self.load_key_from_secrets()
self.load_key_from_conf()
def load_key_from_secrets(self):
'Load key from docker secret using service name'
secret_file = os.path.join('/run/secrets', self.name)
if not os.path.exists(secret_file):
return
try:
self._load_key(secret_file)
self.imported_key = True
except BaseException as e:
logging.exception(e)
logging.warning('Fail to load key from secret, '
'check the key or secret name collision')
def load_key_from_conf(self, hidden_service_dir=None):
'Load key from disk if exists'
if not hidden_service_dir:
hidden_service_dir = self.hidden_service_dir
if not os.path.isdir(hidden_service_dir):
return
try:
self._onion.load_hidden_service(hidden_service_dir)
self.imported_key = True
except EmptyDirException:
pass
def gen_key(self):
self.imported_key = False
return self._onion.gen_new_private_key()
@property
def _priv_key(self):
return self._onion.get_private_key()
class Ports:
port_from = None
dest = None
def __init__(self, port_from, dest):
self.port_from = int(port_from)
self.dest = dest if dest.startswith('unix:') else int(dest)
@property
def is_socket(self):
return self.dest and type(self.dest) is not int
def __iter__(self):
yield 'port_from', str(self.port_from)
yield 'dest', str(self.dest)
yield 'is_socket', self.is_socket
class Service:
def __init__(self, host):
self.host = host
self.ports = []
def add_ports(self, ports):
p = [Ports(*sp.split(':', 1)) for sp in ports.split(',')]
self.ports.extend(p)
def __iter__(self):
yield 'host', self.host
yield 'ports', [dict(p) for p in self.ports]

View file

@ -1,5 +0,0 @@
from .Onions import main
from .Onions import Onions
from .Service import Ports
from .Service import Service
from .Service import ServicesGroup

View file

@ -1,44 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from setuptools import find_packages
from setuptools import setup
setup(
name='onions',
version='0.5.1',
packages=find_packages(),
author="Christophe Mehay",
author_email="cmehay@nospam.student.42.fr",
description="Display onion sites hosted",
include_package_data=True,
url='http://github.com/cmehay/docker-tor-hidden-service',
classifiers=[
"Programming Language :: Python",
"Development Status :: 1 - Planning",
"License :: OSI Approved :: BSD License",
"Natural Language :: English",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Topic :: System :: Installation/Setup",
],
install_requires=['pyentrypoint==0.5.2',
'Jinja2>=2.10',
'pytor>=0.1.5'],
entry_points={
'console_scripts': [
'onions = onions:main',
],
},
license="WTFPL",
)

View file

@ -1,565 +0,0 @@
import json
import os
import re
from base64 import b32encode
from base64 import b64decode
from hashlib import sha1
import pytest
from Crypto.PublicKey import RSA
from onions import Onions
def get_key_and_onion(version=2):
key = {}
key[2] = '''
-----BEGIN RSA PRIVATE KEY-----
MIICXAIBAAKBgQCsMP4gl6g1Q313miPhb1GnDr56ZxIWGsO2PwHM1infkbhlBakR
6DGQfpE31L1ZKTUxY0OexKbW088v8qCOfjD9Zk1i80JP4xzfWQcwFZ5yM/0fkhm3
zLXqXdEahvRthmFsS8OWusRs/04U247ryTm4k5S0Ch5OTBuvMLzQ8W0yDwIDAQAB
AoGAAZr3U5B2ZgC6E7phKUHjbf5KMlPxrDkVqAZQWvuIKmhuYqq518vlYmZ7rhyS
o1kqAMrfH4TP1WLmJJlLe+ibRk2aonR4e0GbW4x151wcJdT1V3vdWAsVSzG3+dqX
PiGT//DIe0OPSH6ecI8ftFRLODd6f5iGkF4gsUSTcVzAFgkCQQDTY67dRpOD9Ozw
oYH48xe0B9NQCw7g4NSH85jPurJXnpn6lZ6bcl8x8ioAdgLyomR7fO/dJFYLw6uV
LZLqZsVbAkEA0Iei3QcpsJnYgcQG7l5I26Sq3LwoiGRDFKRI6k0e+en9JQJgA3Ay
tsLpyCHv9jQ762F6AVXFru5DmZX40F6AXQJBAIHoKac8Xx1h4FaEuo4WPkPZ50ey
dANIx/OAhTFrp3vnMPNpDV60K8JS8vLzkx4vJBcrkXDSirqSFhkIN9grLi8CQEO2
l5MQPWBkRKK2pc2Hfj8cdIMi8kJ/1CyCwE6c5l8etR3sbIMRTtZ76nAbXRFkmsRv
La/7Syrnobngsh/vX90CQB+PSSBqiPSsK2yPz6Gsd6OLCQ9sdy2oRwFTasH8sZyl
bhJ3M9WzP/EMkAzyW8mVs1moFp3hRcfQlZHl6g1U9D8=
-----END RSA PRIVATE KEY-----
'''
onion = {}
pub = {}
onion[2] = b32encode(
sha1(
RSA.importKey(
key[2].strip()
).publickey().exportKey(
"DER"
)[22:]
).digest()[:10]
).decode().lower() + '.onion'
key[3] = '''
PT0gZWQyNTUxOXYxLXNlY3JldDogdHlwZTAgPT0AAACArobDQYyZAWXei4QZwr++j96H1X/gq14N
wLRZ2O5DXuL0EzYKkdhZSILY85q+kfwZH8z4ceqe7u1F+0pQi/sM
'''
pub[3] = '''
PT0gZWQyNTUxOXYxLXB1YmxpYzogdHlwZTAgPT0AAAC9kzftiea/kb+TWlCEVNpfUJLVk+rFIoMG
m9/hW13isA==
'''
onion[3] = 'xwjtp3mj427zdp4tljiiivg2l5ijfvmt5lcsfaygtpp6cw254kykvpyd.onion'
return key[version].strip(), onion[version]
def get_torrc_template():
return r'''
{% for service_group in services %}
HiddenServiceDir {{service_group.hidden_service_dir}}
{% if service_group.version == 3 %}
HiddenServiceVersion 3
{% endif %}
{% for service in service_group.services %}
{% for port in service.ports %}
{% if port.is_socket %}
HiddenServicePort {{port.port_from}} {{port.dest}}
{% endif %}
{% if not port.is_socket %}
HiddenServicePort {{port.port_from}} {{service.host}}:{{port.dest}}
{% endif %}
{% endfor %}
{% endfor %}
{% endfor %}
{% if 'RELAY' in env %}
ORPort 9001
{% endif %}
{% if 'TOR_SOCKS_PORT' in env %}
SocksPort {{env['TOR_SOCKS_PORT']}}
{% else %}
SocksPort 0
{% endif %}
{% if 'TOR_EXTRA_OPTIONS' in env %}
{{env['TOR_EXTRA_OPTIONS']}}
{% endif %}
# useless line for Jinja bug
'''.strip()
def test_ports(monkeypatch):
env = {
'SERVICE1_PORTS': '80:80',
'SERVICE2_PORTS': '80:80,81:8000',
'SERVICE3_PORTS': '80:unix://unix.socket',
}
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
assert len(os.environ) == 3
assert len(onion.services) == 3
check = 0
for service_group in onion.services:
assert len(service_group.services) == 1
service = service_group.services[0]
if service.host == 'service1':
check += 1
assert len(service.ports) == 1
assert service.ports[0].port_from == 80
assert service.ports[0].dest == 80
assert not service.ports[0].is_socket
if service.host == 'service2':
check += 3
assert len(service.ports) == 2
assert service.ports[0].port_from == 80
assert service.ports[0].dest == 80
assert service.ports[1].port_from == 81
assert service.ports[1].dest == 8000
if service.host == 'service3':
check += 6
assert len(service.ports) == 1
assert service.ports[0].port_from == 80
assert service.ports[0].dest == 'unix://unix.socket'
assert service.ports[0].is_socket
assert check == 10
def test_docker_links(fs, monkeypatch):
env = {
'HOSTNAME': 'test_env',
'COMPOSE_SERVICE1_1_PORT': 'tcp://172.17.0.2:80',
'COMPOSE_SERVICE1_1_PORT_80_TCP': 'tcp://172.17.0.2:80',
'COMPOSE_SERVICE1_1_PORT_80_TCP_ADDR': '172.17.0.2',
'COMPOSE_SERVICE1_1_PORT_80_TCP_PORT': '80',
'COMPOSE_SERVICE1_1_PORT_80_TCP_PROTO': 'tcp',
'COMPOSE_SERVICE1_1_PORT_8000_TCP': 'tcp://172.17.0.2:8000',
'COMPOSE_SERVICE1_1_PORT_8000_TCP_ADDR': '172.17.0.2',
'COMPOSE_SERVICE1_1_PORT_8000_TCP_PORT': '8000',
'COMPOSE_SERVICE1_1_PORT_8000_TCP_PROTO': 'tcp',
'COMPOSE_SERVICE1_1_NAME': '/compose_env_1/compose_service1_1',
'SERVICE1_PORT': 'tcp://172.17.0.2:80',
'SERVICE1_PORT_80_TCP': 'tcp://172.17.0.2:80',
'SERVICE1_PORT_80_TCP_ADDR': '172.17.0.2',
'SERVICE1_PORT_80_TCP_PORT': '80',
'SERVICE1_PORT_80_TCP_PROTO': 'tcp',
'SERVICE1_PORT_8000_TCP': 'tcp://172.17.0.2:8000',
'SERVICE1_PORT_8000_TCP_ADDR': '172.17.0.2',
'SERVICE1_PORT_8000_TCP_PORT': '8000',
'SERVICE1_PORT_8000_TCP_PROTO': 'tcp',
'SERVICE1_NAME': '/compose_env_1/service1',
'SERVICE1_1_PORT': 'tcp://172.17.0.2:80',
'SERVICE1_1_PORT_80_TCP': 'tcp://172.17.0.2:80',
'SERVICE1_1_PORT_80_TCP_ADDR': '172.17.0.2',
'SERVICE1_1_PORT_80_TCP_PORT': '80',
'SERVICE1_1_PORT_80_TCP_PROTO': 'tcp',
'SERVICE1_1_PORT_8000_TCP': 'tcp://172.17.0.2:8000',
'SERVICE1_1_PORT_8000_TCP_ADDR': '172.17.0.2',
'SERVICE1_1_PORT_8000_TCP_PORT': '8000',
'SERVICE1_1_PORT_8000_TCP_PROTO': 'tcp',
'SERVICE1_1_NAME': '/compose_env_1/service1_1',
}
etc_host = '''
127.0.0.1 localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
172.17.0.2 service1 bf447f22cdba compose_service1_1
172.17.0.2 service1_1 bf447f22cdba compose_service1_1
172.17.0.2 compose_service1_1 bf447f22cdba
'''.strip()
fs.create_file('/etc/hosts', contents=etc_host)
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_links()
assert len(onion.services) == 1
group = onion.services[0]
assert len(group.services) == 1
service = group.services[0]
assert len(service.ports) == 2
assert set(
(port.port_from, port.dest) for port in service.ports
) == set([(80, 80), (8000, 8000)])
def test_key(monkeypatch):
key, onion_url = get_key_and_onion()
env = {
'SERVICE1_KEY': key
}
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
assert len(os.environ) == 1
assert len(onion.services) == 1
assert onion.services[0].onion_url == onion_url
def test_key_v2(monkeypatch):
key, onion_url = get_key_and_onion(version=2)
envs = [{
'GROUP1_TOR_SERVICE_HOSTS': '80:service1:80,81:service2:80',
'GROUP1_TOR_SERVICE_VERSION': '2',
'GROUP1_TOR_SERVICE_KEY': key,
}, {
'GROUP1_TOR_SERVICE_HOSTS': '80:service1:80,81:service2:80',
'GROUP1_TOR_SERVICE_KEY': key,
}]
for env in envs:
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
onion._load_keys_in_services()
assert len(os.environ) == len(env)
assert len(onion.services) == 1
assert onion.services[0].onion_url == onion_url
def test_key_v3(monkeypatch):
key, onion_url = get_key_and_onion(version=3)
env = {
'GROUP1_TOR_SERVICE_HOSTS': '80:service1:80,81:service2:80',
'GROUP1_TOR_SERVICE_VERSION': '3',
'GROUP1_TOR_SERVICE_KEY': key,
}
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
onion._load_keys_in_services()
assert len(os.environ) == 3
assert len(onion.services) == 1
assert onion.services[0].onion_url == onion_url
def test_key_in_secret(fs, monkeypatch):
env = {
'GROUP1_TOR_SERVICE_HOSTS': '80:service1:80',
'GROUP2_TOR_SERVICE_HOSTS': '80:service2:80',
'GROUP3_TOR_SERVICE_HOSTS': '80:service3:80',
'GROUP3_TOR_SERVICE_VERSION': '3',
}
monkeypatch.setattr(os, 'environ', env)
key_v2, onion_url_v2 = get_key_and_onion()
key_v3, onion_url_v3 = get_key_and_onion(version=3)
fs.create_file('/run/secrets/group1', contents=key_v2)
fs.create_file('/run/secrets/group3', contents=b64decode(key_v3))
onion = Onions()
onion._get_setup_from_env()
onion._load_keys_in_services()
group1 = onion.find_group_by_name('group1')
group2 = onion.find_group_by_name('group2')
group3 = onion.find_group_by_name('group3')
assert group1.onion_url == onion_url_v2
assert group2.onion_url not in [onion_url_v2, onion_url_v3]
assert group3.onion_url == onion_url_v3
def test_configuration(fs, monkeypatch, tmpdir):
extra_options = '''
HiddenServiceNonAnonymousMode 1
HiddenServiceSingleHopMode 1
'''.strip()
env = {
'SERVICE1_SERVICE_NAME': 'group1',
'SERVICE2_SERVICE_NAME': 'group1',
'SERVICE3_SERVICE_NAME': 'group2',
'SERVICE1_PORTS': '80:80',
'SERVICE2_PORTS': '81:80,82:8000',
'SERVICE3_PORTS': '80:unix://unix.socket',
'GROUP3_TOR_SERVICE_VERSION': '2',
'GROUP3_TOR_SERVICE_HOSTS': '80:service4:888,81:service5:8080',
'GROUP4_TOR_SERVICE_VERSION': '3',
'GROUP4_TOR_SERVICE_HOSTS': '81:unix://unix2.sock',
'GROUP3V3_TOR_SERVICE_VERSION': '3',
'GROUP3V3_TOR_SERVICE_HOSTS': '80:service4:888,81:service5:8080',
'SERVICE5_TOR_SERVICE_HOSTS': '80:service5:80',
'TOR_EXTRA_OPTIONS': extra_options,
}
hidden_dir = '/var/lib/tor/hidden_service'
monkeypatch.setattr(os, 'environ', env)
monkeypatch.setattr(os, 'fchmod', lambda x, y: None)
torrc_tpl = get_torrc_template()
fs.create_file('/var/local/tor/torrc.tpl', contents=torrc_tpl)
fs.create_file('/etc/tor/torrc')
fs.create_dir(hidden_dir)
onion = Onions()
onion._get_setup_from_env()
onion._load_keys_in_services()
onion.apply_conf()
onions_urls = {}
for dir in os.listdir(hidden_dir):
with open(os.path.join(hidden_dir, dir, 'hostname'), 'r') as f:
onions_urls[dir] = f.read().strip()
with open('/etc/tor/torrc', 'r') as f:
torrc = f.read()
print(torrc)
assert 'HiddenServiceDir /var/lib/tor/hidden_service/group1' in torrc
assert 'HiddenServicePort 80 service1:80' in torrc
assert 'HiddenServicePort 81 service2:80' in torrc
assert 'HiddenServicePort 82 service2:8000' in torrc
assert 'HiddenServiceDir /var/lib/tor/hidden_service/group2' in torrc
assert 'HiddenServicePort 80 unix://unix.socket' in torrc
assert 'HiddenServiceDir /var/lib/tor/hidden_service/group3' in torrc
assert 'HiddenServiceDir /var/lib/tor/hidden_service/group4' in torrc
assert 'HiddenServiceDir /var/lib/tor/hidden_service/group3v3' in torrc
assert 'HiddenServiceDir /var/lib/tor/hidden_service/service5' in torrc
assert torrc.count('HiddenServicePort 80 service4:888') == 2
assert torrc.count('HiddenServicePort 81 service5:8080') == 2
assert torrc.count('HiddenServicePort 80 service5:80') == 1
assert torrc.count('HiddenServicePort 81 unix://unix2.sock') == 1
assert torrc.count('HiddenServiceVersion 3') == 2
assert 'HiddenServiceNonAnonymousMode 1\n' in torrc
assert 'HiddenServiceSingleHopMode 1\n' in torrc
# Check parser
onion2 = Onions()
onion2.torrc_parser()
assert len(onion2.services) == 6
assert set(
group.name for group in onion2.services
# ) == set(['group1', 'group2'])
) == set(['group1', 'group2', 'group3', 'group4', 'group3v3', 'service5'])
for group in onion2.services:
if group.name == 'group1':
assert len(group.services) == 2
assert group.version == 2
assert group.onion_url == onions_urls[group.name]
assert set(
service.host for service in group.services
) == set(['service1', 'service2'])
for service in group.services:
if service.host == 'service1':
assert len(service.ports) == 1
assert set(
(port.port_from, port.dest) for port in service.ports
) == set([(80, 80)])
if service.host == 'service2':
assert len(service.ports) == 2
assert set(
(port.port_from, port.dest) for port in service.ports
) == set([(81, 80), (82, 8000)])
if group.name == 'group2':
assert len(group.services) == 1
assert group.version == 2
assert group.onion_url == onions_urls[group.name]
assert set(
service.host for service in group.services
) == set(['group2'])
service = group.services[0]
assert len(service.ports) == 1
assert set(
(port.port_from, port.dest) for port in service.ports
) == set([(80, 'unix://unix.socket')])
if group.name in ['group3', 'group3v3']:
assert len(group.services) == 2
assert group.version == 2 if group.name == 'group3' else 3
assert group.onion_url == onions_urls[group.name]
assert set(
service.host for service in group.services
) == set(['service4', 'service5'])
for service in group.services:
if service.host == 'service4':
assert len(service.ports) == 1
assert set(
(port.port_from, port.dest) for port in service.ports
) == set([(80, 888)])
if service.host == 'service5':
assert len(service.ports) == 1
assert set(
(port.port_from, port.dest) for port in service.ports
) == set([(81, 8080)])
if group.name == 'group4':
assert len(group.services) == 1
assert group.version == 3
assert group.onion_url == onions_urls[group.name]
assert set(
service.host for service in group.services
) == set(['group4'])
for service in group.services:
assert service.host == 'group4'
assert len(service.ports) == 1
assert set(
(port.port_from, port.dest) for port in service.ports
) == set([(81, 'unix://unix2.sock')])
if group.name == 'service5':
assert len(group.services) == 1
assert group.version == 2
assert group.onion_url == onions_urls[group.name]
assert set(
service.host for service in group.services
) == set(['service5'])
for service in group.services:
assert service.host == 'service5'
assert len(service.ports) == 1
assert set(
(port.port_from, port.dest) for port in service.ports
) == set([(80, 80)])
def test_groups(monkeypatch):
env = {
'SERVICE1_SERVICE_NAME': 'group1',
'SERVICE2_SERVICE_NAME': 'group1',
'SERVICE3_SERVICE_NAME': 'group2',
'SERVICE1_PORTS': '80:80',
'SERVICE2_PORTS': '81:80,82:8000',
'SERVICE3_PORTS': '80:unix://unix.socket',
}
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
onion_match = r'^[a-z2-7]{16}.onion$'
assert len(os.environ) == 6
assert len(onion.services) == 2
assert set(
group.name for group in onion.services
) == set(['group1', 'group2'])
for group in onion.services:
if group.name == 'group1':
assert len(group.services) == 2
assert set(
service.host for service in group.services
) == set(['service1', 'service2'])
if group.name == 'group2':
assert len(group.services) == 1
assert set(
service.host for service in group.services
) == set(['service3'])
assert re.match(onion_match, group.onion_url)
def test_json(monkeypatch):
env = {
'SERVICE1_SERVICE_NAME': 'group1',
'SERVICE2_SERVICE_NAME': 'group1',
'SERVICE3_SERVICE_NAME': 'group2',
'SERVICE1_PORTS': '80:80',
'SERVICE2_PORTS': '81:80,82:8000',
'SERVICE3_PORTS': '80:unix://unix.socket',
}
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
onion.check_services()
jsn = json.loads(onion.to_json())
assert len(jsn) == 2
assert len(jsn['group1']) == 3
assert len(jsn['group2']) == 1
def test_output(monkeypatch):
env = {
'SERVICE1_SERVICE_NAME': 'group1',
'SERVICE2_SERVICE_NAME': 'group1',
'SERVICE3_SERVICE_NAME': 'group2',
'SERVICE1_PORTS': '80:80',
'SERVICE2_PORTS': '81:80,82:8000',
'SERVICE3_PORTS': '80:unix://unix.socket',
}
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
for item in ['group1', 'group2', '.onion', ',']:
assert item in str(onion)
def test_not_valid_share_port(monkeypatch):
env = {
'SERVICE1_SERVICE_NAME': 'group1',
'SERVICE2_SERVICE_NAME': 'group1',
'SERVICE3_SERVICE_NAME': 'group2',
'SERVICE1_PORTS': '80:80',
'SERVICE2_PORTS': '80:80,82:8000',
'SERVICE3_PORTS': '80:unix://unix.socket',
}
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
with pytest.raises(Exception) as excinfo:
onion.check_services()
assert 'Same port for multiple services' in str(excinfo.value)
def test_not_valid_no_services(monkeypatch):
env = {
'SERVICE1_SERVICE_NAME': 'group1',
'SERVICE2_SERVICE_NAME': 'group1',
'SERVICE3_SERVICE_NAME': 'group2',
}
monkeypatch.setattr(os, 'environ', env)
onion = Onions()
onion._get_setup_from_env()
with pytest.raises(Exception) as excinfo:
onion.check_services()
assert 'has not ports set' in str(excinfo.value)