forked from Ponysearch/Ponysearch
235 lines
7.2 KiB
Python
235 lines
7.2 KiB
Python
import sys
|
|
from time import time
|
|
from itertools import cycle
|
|
from threading import local
|
|
|
|
import requests
|
|
|
|
from searx import settings
|
|
from searx import logger
|
|
from searx.raise_for_httperror import raise_for_httperror
|
|
|
|
|
|
logger = logger.getChild('poolrequests')
|
|
|
|
|
|
try:
|
|
import ssl
|
|
if ssl.OPENSSL_VERSION_INFO[0:3] < (1, 0, 2):
|
|
# https://github.com/certifi/python-certifi#1024-bit-root-certificates
|
|
logger.critical('You are using an old openssl version({0}), please upgrade above 1.0.2!'
|
|
.format(ssl.OPENSSL_VERSION))
|
|
sys.exit(1)
|
|
except ImportError:
|
|
ssl = None
|
|
if not getattr(ssl, "HAS_SNI", False):
|
|
try:
|
|
import OpenSSL # pylint: disable=unused-import
|
|
except ImportError:
|
|
logger.critical("ssl doesn't support SNI and the pyopenssl module is not installed.\n"
|
|
"Some HTTPS connections will fail")
|
|
sys.exit(1)
|
|
|
|
|
|
class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter):
|
|
|
|
def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
|
|
pool_maxsize=requests.adapters.DEFAULT_POOLSIZE,
|
|
max_retries=requests.adapters.DEFAULT_RETRIES,
|
|
pool_block=requests.adapters.DEFAULT_POOLBLOCK,
|
|
**conn_params):
|
|
if max_retries == requests.adapters.DEFAULT_RETRIES:
|
|
self.max_retries = requests.adapters.Retry(0, read=False)
|
|
else:
|
|
self.max_retries = requests.adapters.Retry.from_int(max_retries)
|
|
self.config = {}
|
|
self.proxy_manager = {}
|
|
|
|
super().__init__()
|
|
|
|
self._pool_connections = pool_connections
|
|
self._pool_maxsize = pool_maxsize
|
|
self._pool_block = pool_block
|
|
self._conn_params = conn_params
|
|
|
|
self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params)
|
|
|
|
def __setstate__(self, state):
|
|
# Can't handle by adding 'proxy_manager' to self.__attrs__ because
|
|
# because self.poolmanager uses a lambda function, which isn't pickleable.
|
|
self.proxy_manager = {}
|
|
self.config = {}
|
|
|
|
for attr, value in state.items():
|
|
setattr(self, attr, value)
|
|
|
|
self.init_poolmanager(self._pool_connections, self._pool_maxsize,
|
|
block=self._pool_block, **self._conn_params)
|
|
|
|
|
|
threadLocal = local()
|
|
connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
|
|
maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
|
|
if settings['outgoing'].get('source_ips'):
|
|
http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
|
|
source_address=(source_ip, 0))
|
|
for source_ip in settings['outgoing']['source_ips'])
|
|
https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
|
|
source_address=(source_ip, 0))
|
|
for source_ip in settings['outgoing']['source_ips'])
|
|
else:
|
|
http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
|
|
https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
|
|
|
|
|
|
class SessionSinglePool(requests.Session):
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
# reuse the same adapters
|
|
self.adapters.clear()
|
|
|
|
https_adapter = threadLocal.__dict__.setdefault('https_adapter', next(https_adapters))
|
|
self.mount('https://', https_adapter)
|
|
if get_enable_http_protocol():
|
|
http_adapter = threadLocal.__dict__.setdefault('http_adapter', next(http_adapters))
|
|
self.mount('http://', http_adapter)
|
|
|
|
def close(self):
|
|
"""Call super, but clear adapters since there are managed globaly"""
|
|
self.adapters.clear()
|
|
super().close()
|
|
|
|
|
|
def set_timeout_for_thread(timeout, start_time=None):
|
|
threadLocal.timeout = timeout
|
|
threadLocal.start_time = start_time
|
|
|
|
|
|
def set_enable_http_protocol(enable_http):
|
|
threadLocal.enable_http = enable_http
|
|
|
|
|
|
def get_enable_http_protocol():
|
|
try:
|
|
return threadLocal.enable_http
|
|
except AttributeError:
|
|
return False
|
|
|
|
|
|
def reset_time_for_thread():
|
|
threadLocal.total_time = 0
|
|
|
|
|
|
def get_time_for_thread():
|
|
return threadLocal.total_time
|
|
|
|
|
|
def get_proxy_cycles(proxy_settings):
|
|
if not proxy_settings:
|
|
return None
|
|
# Backwards compatibility for single proxy in settings.yml
|
|
for protocol, proxy in proxy_settings.items():
|
|
if isinstance(proxy, str):
|
|
proxy_settings[protocol] = [proxy]
|
|
|
|
for protocol in proxy_settings:
|
|
proxy_settings[protocol] = cycle(proxy_settings[protocol])
|
|
return proxy_settings
|
|
|
|
|
|
GLOBAL_PROXY_CYCLES = get_proxy_cycles(settings['outgoing'].get('proxies'))
|
|
|
|
|
|
def get_proxies(proxy_cycles):
|
|
if proxy_cycles:
|
|
return {protocol: next(proxy_cycle) for protocol, proxy_cycle in proxy_cycles.items()}
|
|
return None
|
|
|
|
|
|
def get_global_proxies():
|
|
return get_proxies(GLOBAL_PROXY_CYCLES)
|
|
|
|
|
|
def request(method, url, **kwargs):
|
|
"""same as requests/requests/api.py request(...)"""
|
|
time_before_request = time()
|
|
|
|
# session start
|
|
session = SessionSinglePool()
|
|
|
|
# proxies
|
|
if not kwargs.get('proxies'):
|
|
kwargs['proxies'] = get_global_proxies()
|
|
|
|
# timeout
|
|
if 'timeout' in kwargs:
|
|
timeout = kwargs['timeout']
|
|
else:
|
|
timeout = getattr(threadLocal, 'timeout', None)
|
|
if timeout is not None:
|
|
kwargs['timeout'] = timeout
|
|
|
|
# raise_for_error
|
|
check_for_httperror = True
|
|
if 'raise_for_httperror' in kwargs:
|
|
check_for_httperror = kwargs['raise_for_httperror']
|
|
del kwargs['raise_for_httperror']
|
|
|
|
# do request
|
|
response = session.request(method=method, url=url, **kwargs)
|
|
|
|
time_after_request = time()
|
|
|
|
# is there a timeout for this engine ?
|
|
if timeout is not None:
|
|
timeout_overhead = 0.2 # seconds
|
|
# start_time = when the user request started
|
|
start_time = getattr(threadLocal, 'start_time', time_before_request)
|
|
search_duration = time_after_request - start_time
|
|
if search_duration > timeout + timeout_overhead:
|
|
raise requests.exceptions.Timeout(response=response)
|
|
|
|
# session end
|
|
session.close()
|
|
|
|
if hasattr(threadLocal, 'total_time'):
|
|
threadLocal.total_time += time_after_request - time_before_request
|
|
|
|
# raise an exception
|
|
if check_for_httperror:
|
|
raise_for_httperror(response)
|
|
|
|
return response
|
|
|
|
|
|
def get(url, **kwargs):
|
|
kwargs.setdefault('allow_redirects', True)
|
|
return request('get', url, **kwargs)
|
|
|
|
|
|
def options(url, **kwargs):
|
|
kwargs.setdefault('allow_redirects', True)
|
|
return request('options', url, **kwargs)
|
|
|
|
|
|
def head(url, **kwargs):
|
|
kwargs.setdefault('allow_redirects', False)
|
|
return request('head', url, **kwargs)
|
|
|
|
|
|
def post(url, data=None, **kwargs):
|
|
return request('post', url, data=data, **kwargs)
|
|
|
|
|
|
def put(url, data=None, **kwargs):
|
|
return request('put', url, data=data, **kwargs)
|
|
|
|
|
|
def patch(url, data=None, **kwargs):
|
|
return request('patch', url, data=data, **kwargs)
|
|
|
|
|
|
def delete(url, **kwargs):
|
|
return request('delete', url, **kwargs)
|