[enh] replace requests by httpx

This commit is contained in:
Alexandre Flament 2021-03-18 19:59:01 +01:00
parent 111180705b
commit eaa694fb7d
18 changed files with 527 additions and 204 deletions

View file

@ -8,5 +8,10 @@ lxml==4.6.3
pygments==2.8.0 pygments==2.8.0
python-dateutil==2.8.1 python-dateutil==2.8.1
pyyaml==5.4.1 pyyaml==5.4.1
requests[socks]==2.25.1 httpx[http2]==0.17.1
Brotli==1.0.9
uvloop==0.15.2; python_version >= '3.7'
uvloop==0.14.0; python_version < '3.7'
httpx-socks[asyncio]==0.3.1
langdetect==1.0.8 langdetect==1.0.8
setproctitle==1.2.2

View file

@ -20,7 +20,8 @@ from lxml import etree
from json import loads from json import loads
from urllib.parse import urlencode from urllib.parse import urlencode
from requests import RequestException from httpx import HTTPError
from searx import settings from searx import settings
from searx.poolrequests import get as http_get from searx.poolrequests import get as http_get
@ -136,5 +137,5 @@ def search_autocomplete(backend_name, query, lang):
try: try:
return backend(query, lang) return backend(query, lang)
except (RequestException, SearxEngineResponseException): except (HTTPError, SearxEngineResponseException):
return [] return []

View file

@ -52,7 +52,7 @@ def response(resp):
to_results.append(to_result.text_content()) to_results.append(to_result.text_content())
results.append({ results.append({
'url': urljoin(resp.url, '?%d' % k), 'url': urljoin(str(resp.url), '?%d' % k),
'title': from_result.text_content(), 'title': from_result.text_content(),
'content': '; '.join(to_results) 'content': '; '.join(to_results)
}) })

View file

@ -4,7 +4,6 @@
""" """
from json import loads, dumps from json import loads, dumps
from requests.auth import HTTPBasicAuth
from searx.exceptions import SearxEngineAPIException from searx.exceptions import SearxEngineAPIException
@ -32,7 +31,7 @@ def request(query, params):
return params return params
if username and password: if username and password:
params['auth'] = HTTPBasicAuth(username, password) params['auth'] = (username, password)
params['url'] = search_url params['url'] = search_url
params['method'] = 'GET' params['method'] = 'GET'

View file

@ -10,7 +10,7 @@ Definitions`_.
# pylint: disable=invalid-name, missing-function-docstring # pylint: disable=invalid-name, missing-function-docstring
from urllib.parse import urlencode, urlparse from urllib.parse import urlencode
from lxml import html from lxml import html
from searx import logger from searx import logger
from searx.utils import match_language, extract_text, eval_xpath, eval_xpath_list, eval_xpath_getindex from searx.utils import match_language, extract_text, eval_xpath, eval_xpath_list, eval_xpath_getindex
@ -186,8 +186,7 @@ def get_lang_info(params, lang_list, custom_aliases):
return ret_val return ret_val
def detect_google_sorry(resp): def detect_google_sorry(resp):
resp_url = urlparse(resp.url) if resp.url.host == 'sorry.google.com' or resp.url.path.startswith('/sorry'):
if resp_url.netloc == 'sorry.google.com' or resp_url.path.startswith('/sorry'):
raise SearxEngineCaptchaException() raise SearxEngineCaptchaException()

View file

@ -3,7 +3,7 @@
Seznam Seznam
""" """
from urllib.parse import urlencode, urlparse from urllib.parse import urlencode
from lxml import html from lxml import html
from searx.poolrequests import get from searx.poolrequests import get
from searx.exceptions import SearxEngineAccessDeniedException from searx.exceptions import SearxEngineAccessDeniedException
@ -46,8 +46,7 @@ def request(query, params):
def response(resp): def response(resp):
resp_url = urlparse(resp.url) if resp.url.path.startswith('/verify'):
if resp_url.path.startswith('/verify'):
raise SearxEngineAccessDeniedException() raise SearxEngineAccessDeniedException()
results = [] results = []

View file

@ -5,7 +5,7 @@
from json import loads from json import loads
from urllib.parse import urlencode from urllib.parse import urlencode
import requests import searx.poolrequests as requests
import base64 import base64
# about # about

View file

@ -3,7 +3,7 @@
Stackoverflow (IT) Stackoverflow (IT)
""" """
from urllib.parse import urlencode, urljoin, urlparse from urllib.parse import urlencode, urljoin
from lxml import html from lxml import html
from searx.utils import extract_text from searx.utils import extract_text
from searx.exceptions import SearxEngineCaptchaException from searx.exceptions import SearxEngineCaptchaException
@ -41,8 +41,7 @@ def request(query, params):
# get response from search-request # get response from search-request
def response(resp): def response(resp):
resp_url = urlparse(resp.url) if resp.url.path.startswith('/nocaptcha'):
if resp_url.path.startswith('/nocaptcha'):
raise SearxEngineCaptchaException() raise SearxEngineCaptchaException()
results = [] results = []

View file

@ -7,7 +7,7 @@ from json import loads
from dateutil import parser from dateutil import parser
from urllib.parse import urlencode from urllib.parse import urlencode
from requests.auth import HTTPDigestAuth from httpx import DigestAuth
from searx.utils import html_to_text from searx.utils import html_to_text
@ -56,7 +56,7 @@ def request(query, params):
search_type=search_type) search_type=search_type)
if http_digest_auth_user and http_digest_auth_pass: if http_digest_auth_user and http_digest_auth_pass:
params['auth'] = HTTPDigestAuth(http_digest_auth_user, http_digest_auth_pass) params['auth'] = DigestAuth(http_digest_auth_user, http_digest_auth_pass)
# add language tag if specified # add language tag if specified
if params['language'] != 'all': if params['language'] != 'all':

View file

@ -3,7 +3,7 @@ import inspect
import logging import logging
from json import JSONDecodeError from json import JSONDecodeError
from urllib.parse import urlparse from urllib.parse import urlparse
from requests.exceptions import RequestException from httpx import HTTPError, HTTPStatusError
from searx.exceptions import (SearxXPathSyntaxException, SearxEngineXPathException, SearxEngineAPIException, from searx.exceptions import (SearxXPathSyntaxException, SearxEngineXPathException, SearxEngineAPIException,
SearxEngineAccessDeniedException) SearxEngineAccessDeniedException)
from searx import logger from searx import logger
@ -60,28 +60,28 @@ def get_trace(traces):
return traces[-1] return traces[-1]
def get_hostname(exc: RequestException) -> typing.Optional[None]: def get_hostname(exc: HTTPError) -> typing.Optional[None]:
url = exc.request.url url = exc.request.url
if url is None and exc.response is not None: if url is None and exc.response is not None:
url = exc.response.url url = exc.response.url
return urlparse(url).netloc return urlparse(url).netloc
def get_request_exception_messages(exc: RequestException)\ def get_request_exception_messages(exc: HTTPError)\
-> typing.Tuple[typing.Optional[str], typing.Optional[str], typing.Optional[str]]: -> typing.Tuple[typing.Optional[str], typing.Optional[str], typing.Optional[str]]:
url = None url = None
status_code = None status_code = None
reason = None reason = None
hostname = None hostname = None
if exc.request is not None: if hasattr(exc, 'request') and exc.request is not None:
url = exc.request.url url = exc.request.url
if url is None and exc.response is not None: if url is None and hasattr(exc, 'response') and exc.respones is not None:
url = exc.response.url url = exc.response.url
if url is not None: if url is not None:
hostname = str(urlparse(url).netloc) hostname = url.host
if exc.response is not None: if isinstance(exc, HTTPStatusError):
status_code = str(exc.response.status_code) status_code = str(exc.response.status_code)
reason = exc.response.reason reason = exc.response.reason_phrase
return (status_code, reason, hostname) return (status_code, reason, hostname)
@ -92,7 +92,7 @@ def get_messages(exc, filename) -> typing.Tuple:
return (str(exc), ) return (str(exc), )
if isinstance(exc, ValueError) and 'lxml' in filename: if isinstance(exc, ValueError) and 'lxml' in filename:
return (str(exc), ) return (str(exc), )
if isinstance(exc, RequestException): if isinstance(exc, HTTPError):
return get_request_exception_messages(exc) return get_request_exception_messages(exc)
if isinstance(exc, SearxXPathSyntaxException): if isinstance(exc, SearxXPathSyntaxException):
return (exc.xpath_str, exc.message) return (exc.xpath_str, exc.message)

View file

@ -1,14 +1,54 @@
import atexit
import sys import sys
import threading
import asyncio
import logging
import concurrent.futures
from time import time from time import time
from itertools import cycle from itertools import cycle
from threading import local
import requests import httpcore
import httpx
import h2.exceptions
from httpx_socks import AsyncProxyTransport
from python_socks import parse_proxy_url
import python_socks._errors
from searx import settings from searx import settings
from searx import logger from searx import logger
from searx.raise_for_httperror import raise_for_httperror from searx.raise_for_httperror import raise_for_httperror
# Optional uvloop (support Python 3.6)
try:
import uvloop
except ImportError:
pass
else:
uvloop.install()
# queue.SimpleQueue: Support Python 3.6
try:
from queue import SimpleQueue
except ImportError:
from queue import Empty
from collections import deque
class SimpleQueue:
"""Minimal backport of queue.SimpleQueue"""
def __init__(self):
self._queue = deque()
self._count = threading.Semaphore(0)
def put(self, item):
self._queue.append(item)
self._count.release()
def get(self, timeout=None):
if not self._count.acquire(True, timeout):
raise Empty
return self._queue.popleft()
logger = logger.getChild('poolrequests') logger = logger.getChild('poolrequests')
@ -31,99 +71,63 @@ if not getattr(ssl, "HAS_SNI", False):
sys.exit(1) sys.exit(1)
class HTTPAdapterWithConnParams(requests.adapters.HTTPAdapter): LOOP = None
CLIENTS = dict()
def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE, THREADLOCAL = threading.local()
pool_maxsize=requests.adapters.DEFAULT_POOLSIZE, LIMITS = httpx.Limits(
max_retries=requests.adapters.DEFAULT_RETRIES, # Magic number kept from previous code
pool_block=requests.adapters.DEFAULT_POOLBLOCK, max_connections=settings['outgoing'].get('pool_connections', 100),
**conn_params): # Picked from constructor
if max_retries == requests.adapters.DEFAULT_RETRIES: max_keepalive_connections=settings['outgoing'].get('pool_maxsize', 10),
self.max_retries = requests.adapters.Retry(0, read=False) #
else: keepalive_expiry=settings['outgoing'].get('keepalive_expiry', 5.0)
self.max_retries = requests.adapters.Retry.from_int(max_retries) )
self.config = {} # default parameters for AsyncHTTPTransport
self.proxy_manager = {} # see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # noqa
TRANSPORT_KWARGS = {
super().__init__() 'http2': settings['outgoing'].get('http2', False),
'retries': 0,
self._pool_connections = pool_connections 'trust_env': False,
self._pool_maxsize = pool_maxsize 'backend': 'asyncio'
self._pool_block = pool_block }
self._conn_params = conn_params # requests compatibility when reading proxy settings from settings.yml
PROXY_PATTERN_MAPPING = {
self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block, **conn_params) 'http': 'https://',
'https:': 'https://'
def __setstate__(self, state): }
# Can't handle by adding 'proxy_manager' to self.__attrs__ because # default maximum redirect
# because self.poolmanager uses a lambda function, which isn't pickleable. # from https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
self.proxy_manager = {} DEFAULT_REDIRECT_LIMIT = 30
self.config = {}
for attr, value in state.items():
setattr(self, attr, value)
self.init_poolmanager(self._pool_connections, self._pool_maxsize,
block=self._pool_block, **self._conn_params)
threadLocal = local()
connect = settings['outgoing'].get('pool_connections', 100) # Magic number kept from previous code
maxsize = settings['outgoing'].get('pool_maxsize', requests.adapters.DEFAULT_POOLSIZE) # Picked from constructor
if settings['outgoing'].get('source_ips'): if settings['outgoing'].get('source_ips'):
http_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize, LOCAL_ADDRESS_CYCLE = cycle(settings['outgoing'].get('source_ips'))
source_address=(source_ip, 0))
for source_ip in settings['outgoing']['source_ips'])
https_adapters = cycle(HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize,
source_address=(source_ip, 0))
for source_ip in settings['outgoing']['source_ips'])
else: else:
http_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), )) LOCAL_ADDRESS_CYCLE = cycle((None, ))
https_adapters = cycle((HTTPAdapterWithConnParams(pool_connections=connect, pool_maxsize=maxsize), ))
class SessionSinglePool(requests.Session):
def __init__(self):
super().__init__()
# reuse the same adapters
self.adapters.clear()
https_adapter = threadLocal.__dict__.setdefault('https_adapter', next(https_adapters))
self.mount('https://', https_adapter)
if get_enable_http_protocol():
http_adapter = threadLocal.__dict__.setdefault('http_adapter', next(http_adapters))
self.mount('http://', http_adapter)
def close(self):
"""Call super, but clear adapters since there are managed globaly"""
self.adapters.clear()
super().close()
def set_timeout_for_thread(timeout, start_time=None): def set_timeout_for_thread(timeout, start_time=None):
threadLocal.timeout = timeout THREADLOCAL.timeout = timeout
threadLocal.start_time = start_time THREADLOCAL.start_time = start_time
def set_enable_http_protocol(enable_http): def set_enable_http_protocol(enable_http):
threadLocal.enable_http = enable_http THREADLOCAL.enable_http = enable_http
def get_enable_http_protocol(): def get_enable_http_protocol():
try: try:
return threadLocal.enable_http return THREADLOCAL.enable_http
except AttributeError: except AttributeError:
return False return False
def reset_time_for_thread(): def reset_time_for_thread():
threadLocal.total_time = 0 THREADLOCAL.total_time = 0
def get_time_for_thread(): def get_time_for_thread():
return threadLocal.total_time return THREADLOCAL.total_time
def get_proxy_cycles(proxy_settings): def get_proxy_cycles(proxy_settings):
@ -152,22 +156,197 @@ def get_global_proxies():
return get_proxies(GLOBAL_PROXY_CYCLES) return get_proxies(GLOBAL_PROXY_CYCLES)
async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
origin = httpcore._utils.url_to_origin(url)
logger.debug('Drop connections for %r', origin)
connections_to_close = connection_pool._connections_for_origin(origin)
for connection in connections_to_close:
await connection_pool._remove_from_pool(connection)
try:
await connection.aclose()
except httpcore.NetworkError as e:
logger.warning('Error closing an existing connection', exc_info=e)
class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
"""Block HTTP request"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
class AsyncProxyTransportFixed(AsyncProxyTransport):
"""Fix httpx_socks.AsyncProxyTransport
Map python_socks exceptions to httpcore.ProxyError
Map socket.gaierror to httpcore.ConnectError
Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
* self._keepalive_sweep()
* self._response_closed(self, connection)
Note: AsyncProxyTransport inherit from AsyncConnectionPool
Note: the API is going to change on httpx 0.18.0
see https://github.com/encode/httpx/pull/1522
"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().arequest(method, url, headers, stream, ext)
except (python_socks._errors.ProxyConnectionError,
python_socks._errors.ProxyTimeoutError,
python_socks._errors.ProxyError) as e:
raise httpcore.ProxyError(e)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpcore.NetworkError(e)
except httpcore.RemoteProtocolError as e:
# in case of httpcore.RemoteProtocolError: Server disconnected
await close_connections_for_url(self, url)
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpcore.NetworkError, httpcore.ProtocolError) as e:
# httpcore.WriteError on HTTP/2 connection leaves a new opened stream
# then each new request creates a new stream and raise the same WriteError
await close_connections_for_url(self, url)
raise e
class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
"""Fix httpx.AsyncHTTPTransport"""
async def arequest(self, method, url, headers=None, stream=None, ext=None):
retry = 2
while retry > 0:
retry -= 1
try:
return await super().arequest(method, url, headers, stream, ext)
except OSError as e:
# socket.gaierror when DNS resolution fails
raise httpcore.ConnectError(e)
except httpcore.CloseError as e:
# httpcore.CloseError: [Errno 104] Connection reset by peer
# raised by _keepalive_sweep()
# from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
await close_connections_for_url(self._pool, url)
logger.warning('httpcore.CloseError: retry', exc_info=e)
# retry
except httpcore.RemoteProtocolError as e:
# in case of httpcore.RemoteProtocolError: Server disconnected
await close_connections_for_url(self._pool, url)
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
# retry
except (httpcore.ProtocolError, httpcore.NetworkError) as e:
await close_connections_for_url(self._pool, url)
raise e
def get_transport_for_socks_proxy(verify, local_address, proxy_url):
global LOOP, LIMITS, TRANSPORT_KWARGS
# support socks5h (requests compatibility):
# https://requests.readthedocs.io/en/master/user/advanced/#socks
# socks5:// hostname is resolved on client side
# socks5h:// hostname is resolved on proxy side
rdns = False
socks5h = 'socks5h://'
if proxy_url.startswith(socks5h):
proxy_url = 'socks5://' + proxy_url[len(socks5h):]
rdns = True
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
username=proxy_username, password=proxy_password,
rdns=rdns,
loop=LOOP,
verify=verify,
local_address=local_address,
max_connections=LIMITS.max_connections,
max_keepalive_connections=LIMITS.max_keepalive_connections,
keepalive_expiry=LIMITS.keepalive_expiry,
**TRANSPORT_KWARGS)
def get_transport(verify, local_address, proxy_url):
global LIMITS
return AsyncHTTPTransportFixed(verify=verify,
local_address=local_address,
limits=LIMITS,
proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
**TRANSPORT_KWARGS)
def iter_proxies(proxies):
# https://www.python-httpx.org/compatibility/#proxy-keys
if isinstance(proxies, str):
yield 'all://', proxies
elif isinstance(proxies, dict):
for pattern, proxy_url in proxies.items():
pattern = PROXY_PATTERN_MAPPING.get(pattern, pattern)
yield pattern, proxy_url
def new_client(verify, local_address, proxies, max_redirects, enable_http):
# See https://www.python-httpx.org/advanced/#routing
mounts = {}
for pattern, proxy_url in iter_proxies(proxies):
if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
continue
if proxy_url.startswith('socks4://') \
or proxy_url.startswith('socks5://') \
or proxy_url.startswith('socks5h://'):
mounts[pattern] = get_transport_for_socks_proxy(verify, local_address, proxy_url)
else:
mounts[pattern] = get_transport(verify, local_address, proxy_url)
if not enable_http:
mounts['http://'] = AsyncHTTPTransportNoHttp()
transport = get_transport(verify, local_address, None)
return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
def get_client(verify, local_address, proxies, max_redirects, allow_http):
global CLIENTS
key = (verify, local_address, repr(proxies), max_redirects, allow_http)
if key not in CLIENTS:
CLIENTS[key] = new_client(verify, local_address, proxies, max_redirects, allow_http)
return CLIENTS[key]
async def send_request(method, url, enable_http, kwargs):
if isinstance(url, bytes):
url = url.decode()
verify = kwargs.pop('verify', True)
local_address = next(LOCAL_ADDRESS_CYCLE)
proxies = kwargs.pop('proxies', None) or get_global_proxies()
max_redirects = kwargs.pop('max_redirects', DEFAULT_REDIRECT_LIMIT)
client = get_client(verify, local_address, proxies, max_redirects, enable_http)
response = await client.request(method.upper(), url, **kwargs)
# requests compatibility
# see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
response.ok = not response.is_error
return response
def request(method, url, **kwargs): def request(method, url, **kwargs):
"""same as requests/requests/api.py request(...)""" """same as requests/requests/api.py request(...)"""
time_before_request = time() time_before_request = time()
# session start
session = SessionSinglePool()
# proxies
if not kwargs.get('proxies'):
kwargs['proxies'] = get_global_proxies()
# timeout # timeout
if 'timeout' in kwargs: if 'timeout' in kwargs:
timeout = kwargs['timeout'] timeout = kwargs['timeout']
else: else:
timeout = getattr(threadLocal, 'timeout', None) timeout = getattr(THREADLOCAL, 'timeout', None)
if timeout is not None: if timeout is not None:
kwargs['timeout'] = timeout kwargs['timeout'] = timeout
@ -178,24 +357,23 @@ def request(method, url, **kwargs):
del kwargs['raise_for_httperror'] del kwargs['raise_for_httperror']
# do request # do request
response = session.request(method=method, url=url, **kwargs) future = asyncio.run_coroutine_threadsafe(send_request(method, url, get_enable_http_protocol(), kwargs), LOOP)
try:
if timeout:
timeout += 0.2 # overhead
start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
if start_time:
timeout -= time() - start_time
time_after_request = time() response = future.result(timeout or 120)
except concurrent.futures.TimeoutError as e:
raise httpx.TimeoutException('Timeout', request=None) from e
# is there a timeout for this engine ? # update total_time.
if timeout is not None: # See get_time_for_thread() and reset_time_for_thread()
timeout_overhead = 0.2 # seconds if hasattr(THREADLOCAL, 'total_time'):
# start_time = when the user request started time_after_request = time()
start_time = getattr(threadLocal, 'start_time', time_before_request) THREADLOCAL.total_time += time_after_request - time_before_request
search_duration = time_after_request - start_time
if search_duration > timeout + timeout_overhead:
raise requests.exceptions.Timeout(response=response)
# session end
session.close()
if hasattr(threadLocal, 'total_time'):
threadLocal.total_time += time_after_request - time_before_request
# raise an exception # raise an exception
if check_for_httperror: if check_for_httperror:
@ -204,6 +382,49 @@ def request(method, url, **kwargs):
return response return response
async def stream_chunk_to_queue(method, url, q, **kwargs):
verify = kwargs.pop('verify', True)
local_address = next(LOCAL_ADDRESS_CYCLE)
proxies = kwargs.pop('proxies', None) or get_global_proxies()
# "30" from requests:
# https://github.com/psf/requests/blob/8c211a96cdbe9fe320d63d9e1ae15c5c07e179f8/requests/models.py#L55
max_redirects = kwargs.pop('max_redirects', 30)
client = get_client(verify, local_address, proxies, max_redirects, True)
try:
async with client.stream(method, url, **kwargs) as response:
q.put(response)
async for chunk in response.aiter_bytes(65536):
if len(chunk) > 0:
q.put(chunk)
except (httpx.HTTPError, OSError, h2.exceptions.ProtocolError) as e:
q.put(e)
finally:
q.put(None)
def stream(method, url, **kwargs):
"""Replace httpx.stream.
Usage:
stream = poolrequests.stream(...)
response = next(stream)
for chunk in stream:
...
httpx.Client.stream requires to write the httpx.HTTPTransport version of the
the httpx.AsyncHTTPTransport declared above.
"""
q = SimpleQueue()
future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(method, url, q, **kwargs), LOOP)
chunk_or_exception = q.get(timeout=60)
while chunk_or_exception is not None:
if isinstance(chunk_or_exception, Exception):
raise chunk_or_exception
yield chunk_or_exception
chunk_or_exception = q.get(timeout=60)
return future.result()
def get(url, **kwargs): def get(url, **kwargs):
kwargs.setdefault('allow_redirects', True) kwargs.setdefault('allow_redirects', True)
return request('get', url, **kwargs) return request('get', url, **kwargs)
@ -233,3 +454,97 @@ def patch(url, data=None, **kwargs):
def delete(url, **kwargs): def delete(url, **kwargs):
return request('delete', url, **kwargs) return request('delete', url, **kwargs)
def init():
# log
for logger_name in ('hpack.hpack', 'hpack.table'):
logging.getLogger(logger_name).setLevel(logging.WARNING)
# loop
def loop_thread():
global LOOP
LOOP = asyncio.new_event_loop()
LOOP.run_forever()
th = threading.Thread(
target=loop_thread,
name='asyncio_loop',
daemon=True,
)
th.start()
@atexit.register
def done():
"""Close all HTTP client
Avoid a warning at exit
see https://github.com/encode/httpx/blob/1a6e254f72d9fd5694a1c10a28927e193ab4f76b/httpx/_client.py#L1785
"""
global LOOP
async def close_client(client):
try:
await client.aclose()
except httpx.HTTPError:
pass
async def close_clients():
await asyncio.gather(*[close_client(client) for client in CLIENTS.values()], return_exceptions=False)
future = asyncio.run_coroutine_threadsafe(close_clients(), LOOP)
# wait 3 seconds to close the HTTP clients
future.result(3)
init()
# ## TEMPORARY DEBUG ##
def debug_connection(connection):
now = LOOP.time()
expired = (connection.state == httpcore._async.base.ConnectionState.IDLE
and connection.expires_at is not None
and now >= connection.expires_at)
return connection.info()\
+ (', connect_failed' if connection.connect_failed else '')\
+ (', expired' if expired else '')
def debug_origin(origin):
return origin[0].decode() + '://' + origin[1].decode() + ':' + str(origin[2])
def debug_transport(transport):
result = {
'__class__': str(transport.__class__.__name__)
}
if isinstance(transport, (httpx.AsyncHTTPTransport, AsyncHTTPTransportFixed)):
pool = transport._pool
result['__pool_class__'] = str(pool.__class__.__name__)
if isinstance(pool, httpcore.AsyncConnectionPool):
for origin, connections in pool._connections.items():
result[debug_origin(origin)] = [debug_connection(connection) for connection in connections]
return result
elif isinstance(transport, AsyncProxyTransportFixed):
for origin, connections in transport._connections.items():
result[debug_origin(origin)] = [debug_connection(connection) for connection in connections]
return result
return result
def debug_asyncclient(client, key=None):
result = {}
if key:
result['__key__'] = [k if isinstance(k, (str, int, float, bool, type(None))) else repr(k) for k in key]
result['__default__'] = debug_transport(client._transport)
for urlpattern, transport in client._mounts.items():
result[urlpattern.pattern] = debug_transport(transport)
return result
def debug_asyncclients():
global CLIENTS
return [debug_asyncclient(client, key) for key, client in CLIENTS.items()]

View file

@ -11,7 +11,7 @@ from urllib.parse import urlparse
import re import re
from langdetect import detect_langs from langdetect import detect_langs
from langdetect.lang_detect_exception import LangDetectException from langdetect.lang_detect_exception import LangDetectException
import requests.exceptions import httpx
from searx import poolrequests, logger from searx import poolrequests, logger
from searx.results import ResultContainer from searx.results import ResultContainer
@ -90,10 +90,10 @@ def _is_url_image(image_url):
if r.headers["content-type"].startswith('image/'): if r.headers["content-type"].startswith('image/'):
return True return True
return False return False
except requests.exceptions.Timeout: except httpx.TimeoutException:
logger.error('Timeout for %s: %i', image_url, int(time() - a)) logger.error('Timeout for %s: %i', image_url, int(time() - a))
retry -= 1 retry -= 1
except requests.exceptions.RequestException: except httpx.HTTPError:
logger.exception('Exception for %s', image_url) logger.exception('Exception for %s', image_url)
return False return False

View file

@ -1,10 +1,10 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
from urllib.parse import urlparse
from time import time from time import time
import threading import threading
import asyncio
import requests.exceptions import httpx
import searx.poolrequests as poolrequests import searx.poolrequests as poolrequests
from searx.engines import settings from searx.engines import settings
@ -99,8 +99,8 @@ class OnlineProcessor(EngineProcessor):
# unexpected redirect : record an error # unexpected redirect : record an error
# but the engine might still return valid results. # but the engine might still return valid results.
status_code = str(response.status_code or '') status_code = str(response.status_code or '')
reason = response.reason or '' reason = response.reason_phrase or ''
hostname = str(urlparse(response.url or '').netloc) hostname = response.url.host
record_error(self.engine_name, record_error(self.engine_name,
'{} redirects, maximum: {}'.format(len(response.history), soft_max_redirects), '{} redirects, maximum: {}'.format(len(response.history), soft_max_redirects),
(status_code, reason, hostname)) (status_code, reason, hostname))
@ -135,7 +135,7 @@ class OnlineProcessor(EngineProcessor):
poolrequests.set_enable_http_protocol(self.engine.enable_http) poolrequests.set_enable_http_protocol(self.engine.enable_http)
# suppose everything will be alright # suppose everything will be alright
requests_exception = False http_exception = False
suspended_time = None suspended_time = None
try: try:
@ -169,20 +169,20 @@ class OnlineProcessor(EngineProcessor):
with threading.RLock(): with threading.RLock():
self.engine.stats['errors'] += 1 self.engine.stats['errors'] += 1
if (issubclass(e.__class__, requests.exceptions.Timeout)): if (issubclass(e.__class__, (httpx.TimeoutException, asyncio.TimeoutError))):
result_container.add_unresponsive_engine(self.engine_name, 'HTTP timeout') result_container.add_unresponsive_engine(self.engine_name, 'HTTP timeout')
# requests timeout (connect or read) # requests timeout (connect or read)
logger.error("engine {0} : HTTP requests timeout" logger.error("engine {0} : HTTP requests timeout"
"(search duration : {1} s, timeout: {2} s) : {3}" "(search duration : {1} s, timeout: {2} s) : {3}"
.format(self.engine_name, engine_time, timeout_limit, e.__class__.__name__)) .format(self.engine_name, engine_time, timeout_limit, e.__class__.__name__))
requests_exception = True http_exception = True
elif (issubclass(e.__class__, requests.exceptions.RequestException)): elif (issubclass(e.__class__, (httpx.HTTPError, httpx.StreamError))):
result_container.add_unresponsive_engine(self.engine_name, 'HTTP error') result_container.add_unresponsive_engine(self.engine_name, 'HTTP error')
# other requests exception # other requests exception
logger.exception("engine {0} : requests exception" logger.exception("engine {0} : requests exception"
"(search duration : {1} s, timeout: {2} s) : {3}" "(search duration : {1} s, timeout: {2} s) : {3}"
.format(self.engine_name, engine_time, timeout_limit, e)) .format(self.engine_name, engine_time, timeout_limit, e))
requests_exception = True http_exception = True
elif (issubclass(e.__class__, SearxEngineCaptchaException)): elif (issubclass(e.__class__, SearxEngineCaptchaException)):
result_container.add_unresponsive_engine(self.engine_name, 'CAPTCHA required') result_container.add_unresponsive_engine(self.engine_name, 'CAPTCHA required')
logger.exception('engine {0} : CAPTCHA'.format(self.engine_name)) logger.exception('engine {0} : CAPTCHA'.format(self.engine_name))
@ -206,7 +206,7 @@ class OnlineProcessor(EngineProcessor):
# suspend the engine if there is an HTTP error # suspend the engine if there is an HTTP error
# or suspended_time is defined # or suspended_time is defined
with threading.RLock(): with threading.RLock():
if requests_exception or suspended_time: if http_exception or suspended_time:
# update continuous_errors / suspend_end_time # update continuous_errors / suspend_end_time
self.engine.continuous_errors += 1 self.engine.continuous_errors += 1
if suspended_time is None: if suspended_time is None:

View file

@ -67,11 +67,13 @@ ui:
# key : !!binary "your_morty_proxy_key" # key : !!binary "your_morty_proxy_key"
outgoing: # communication with search engines outgoing: # communication with search engines
request_timeout : 2.0 # default timeout in seconds, can be override by engine request_timeout : 3.0 # default timeout in seconds, can be override by engine
# max_request_timeout: 10.0 # the maximum timeout in seconds # max_request_timeout: 10.0 # the maximum timeout in seconds
useragent_suffix : "" # suffix of searx_useragent, could contain informations like an email address to the administrator useragent_suffix : "" # suffix of searx_useragent, could contain informations like an email address to the administrator
pool_connections : 100 # Number of different hosts pool_connections : 100 # The maximum number of concurrent connections that may be established.
pool_maxsize : 10 # Number of simultaneous requests by host pool_maxsize : 20 # Allow the connection pool to maintain keep-alive connections below this point.
keepalive_expiry: 30.0 # Number of seconds to keep a connection in the pool
http2: True # Enable HTTP/2 (experimental)
# uncomment below section if you want to use a proxy # uncomment below section if you want to use a proxy
# see https://2.python-requests.org/en/latest/user/advanced/#proxies # see https://2.python-requests.org/en/latest/user/advanced/#proxies
# SOCKS proxies are also supported: see https://2.python-requests.org/en/latest/user/advanced/#socks # SOCKS proxies are also supported: see https://2.python-requests.org/en/latest/user/advanced/#socks

View file

@ -45,7 +45,7 @@ def searx_useragent():
"""Return the searx User Agent""" """Return the searx User Agent"""
return 'searx/{searx_version} {suffix}'.format( return 'searx/{searx_version} {suffix}'.format(
searx_version=VERSION_STRING, searx_version=VERSION_STRING,
suffix=settings['outgoing'].get('useragent_suffix', '')) suffix=settings['outgoing'].get('useragent_suffix', '')).strip()
def gen_useragent(os=None): def gen_useragent(os=None):

View file

@ -26,12 +26,26 @@ if __name__ == '__main__':
from os.path import realpath, dirname from os.path import realpath, dirname
sys.path.append(realpath(dirname(realpath(__file__)) + '/../')) sys.path.append(realpath(dirname(realpath(__file__)) + '/../'))
# set Unix thread name
try:
import setproctitle
except ImportError:
pass
else:
import threading
old_thread_init = threading.Thread.__init__
def new_thread_init(self, *args, **kwargs):
old_thread_init(self, *args, **kwargs)
setproctitle.setthreadtitle(self._name)
threading.Thread.__init__ = new_thread_init
import hashlib import hashlib
import hmac import hmac
import json import json
import os import os
import requests import httpx
from searx import logger from searx import logger
logger = logger.getChild('webapp') logger = logger.getChild('webapp')
@ -79,7 +93,7 @@ from searx.plugins import plugins
from searx.plugins.oa_doi_rewrite import get_doi_resolver from searx.plugins.oa_doi_rewrite import get_doi_resolver
from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES
from searx.answerers import answerers from searx.answerers import answerers
from searx.poolrequests import get_global_proxies from searx import poolrequests
from searx.answerers import ask from searx.answerers import ask
from searx.metrology.error_recorder import errors_per_engines from searx.metrology.error_recorder import errors_per_engines
@ -890,51 +904,63 @@ def _is_selected_language_supported(engine, preferences):
@app.route('/image_proxy', methods=['GET']) @app.route('/image_proxy', methods=['GET'])
def image_proxy(): def image_proxy():
url = request.args.get('url').encode() url = request.args.get('url')
if not url: if not url:
return '', 400 return '', 400
h = new_hmac(settings['server']['secret_key'], url) h = new_hmac(settings['server']['secret_key'], url.encode())
if h != request.args.get('h'): if h != request.args.get('h'):
return '', 400 return '', 400
headers = dict_subset(request.headers, {'If-Modified-Since', 'If-None-Match'}) maximum_size = 5 * 1024 * 1024
headers['User-Agent'] = gen_useragent()
resp = requests.get(url, try:
stream=True, headers = dict_subset(request.headers, {'If-Modified-Since', 'If-None-Match'})
timeout=settings['outgoing']['request_timeout'], headers['User-Agent'] = gen_useragent()
headers=headers, stream = poolrequests.stream(
proxies=get_global_proxies()) method='GET',
url=url,
headers=headers,
timeout=settings['outgoing']['request_timeout'],
allow_redirects=True,
max_redirects=20)
if resp.status_code == 304: resp = next(stream)
return '', resp.status_code content_length = resp.headers.get('Content-Length')
if content_length and content_length.isdigit() and int(content_length) > maximum_size:
return 'Max size', 400
if resp.status_code != 200: if resp.status_code == 304:
logger.debug('image-proxy: wrong response code: {0}'.format(resp.status_code))
if resp.status_code >= 400:
return '', resp.status_code return '', resp.status_code
if resp.status_code != 200:
logger.debug('image-proxy: wrong response code: {0}'.format(resp.status_code))
if resp.status_code >= 400:
return '', resp.status_code
return '', 400
if not resp.headers.get('content-type', '').startswith('image/'):
logger.debug('image-proxy: wrong content-type: {0}'.format(resp.headers.get('content-type')))
return '', 400
headers = dict_subset(resp.headers, {'Content-Length', 'Length', 'Date', 'Last-Modified', 'Expires', 'Etag'})
total_length = 0
def forward_chunk():
nonlocal total_length
for chunk in stream:
total_length += len(chunk)
if total_length > maximum_size:
break
yield chunk
return Response(forward_chunk(), mimetype=resp.headers['Content-Type'], headers=headers)
except httpx.HTTPError:
return '', 400 return '', 400
if not resp.headers.get('content-type', '').startswith('image/'):
logger.debug('image-proxy: wrong content-type: {0}'.format(resp.headers.get('content-type')))
return '', 400
img = b''
chunk_counter = 0
for chunk in resp.iter_content(1024 * 1024):
chunk_counter += 1
if chunk_counter > 5:
return '', 502 # Bad gateway - file is too big (>5M)
img += chunk
headers = dict_subset(resp.headers, {'Content-Length', 'Length', 'Date', 'Last-Modified', 'Expires', 'Etag'})
return Response(img, mimetype=resp.headers['content-type'], headers=headers)
@app.route('/stats', methods=['GET']) @app.route('/stats', methods=['GET'])
def stats(): def stats():
@ -1083,6 +1109,11 @@ def config():
}) })
@app.route('/config/http')
def config_http():
return jsonify(poolrequests.debug_asyncclients())
@app.errorhandler(404) @app.errorhandler(404)
def page_not_found(e): def page_not_found(e):
return render('404.html'), 404 return render('404.html'), 404

View file

@ -17,7 +17,7 @@ import json
import re import re
from os.path import join from os.path import join
import requests import httpx
from searx import searx_dir # pylint: disable=E0401 C0413 from searx import searx_dir # pylint: disable=E0401 C0413
@ -30,7 +30,7 @@ HTTP_COLON = 'http:'
def get_bang_url(): def get_bang_url():
response = requests.get(URL_BV1) response = httpx.get(URL_BV1)
response.raise_for_status() response.raise_for_status()
r = RE_BANG_VERSION.findall(response.text) r = RE_BANG_VERSION.findall(response.text)
@ -38,7 +38,7 @@ def get_bang_url():
def fetch_ddg_bangs(url): def fetch_ddg_bangs(url):
response = requests.get(url) response = httpx.get(url)
response.raise_for_status() response.raise_for_status()
return json.loads(response.content.decode()) return json.loads(response.content.decode())

View file

@ -1,9 +1,5 @@
from unittest.mock import patch
from requests.models import Response
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
import searx.poolrequests
from searx.poolrequests import get_proxy_cycles, get_proxies from searx.poolrequests import get_proxy_cycles, get_proxies
@ -64,26 +60,3 @@ class TestProxy(SearxTestCase):
'http': 'http://localhost:9092', 'http': 'http://localhost:9092',
'https': 'http://localhost:9093' 'https': 'http://localhost:9093'
}) })
@patch('searx.poolrequests.get_global_proxies')
def test_request(self, mock_get_global_proxies):
method = 'GET'
url = 'http://localhost'
custom_proxies = {
'https': 'http://localhost:1080'
}
global_proxies = {
'http': 'http://localhost:9092',
'https': 'http://localhost:9093'
}
mock_get_global_proxies.return_value = global_proxies
# check the global proxies usage
with patch.object(searx.poolrequests.SessionSinglePool, 'request', return_value=Response()) as mock_method:
searx.poolrequests.request(method, url)
mock_method.assert_called_once_with(method=method, url=url, proxies=global_proxies)
# check if the proxies parameter overrides the global proxies
with patch.object(searx.poolrequests.SessionSinglePool, 'request', return_value=Response()) as mock_method:
searx.poolrequests.request(method, url, proxies=custom_proxies)
mock_method.assert_called_once_with(method=method, url=url, proxies=custom_proxies)