forked from Ponysearch/Ponysearch
d14994dc73
settings.yml: * outgoing.networks: * can contains network definition * propertiers: enable_http, verify, http2, max_connections, max_keepalive_connections, keepalive_expiry, local_addresses, support_ipv4, support_ipv6, proxies, max_redirects, retries * retries: 0 by default, number of times searx retries to send the HTTP request (using different IP & proxy each time) * local_addresses can be "192.168.0.1/24" (it supports IPv6) * support_ipv4 & support_ipv6: both True by default see https://github.com/searx/searx/pull/1034 * each engine can define a "network" section: * either a full network description * either reference an existing network * all HTTP requests of engine use the same HTTP configuration (it was not the case before, see proxy configuration in master)
214 lines
8.3 KiB
Python
214 lines
8.3 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
|
|
import httpcore
|
|
import httpx
|
|
from httpx_socks import AsyncProxyTransport
|
|
from python_socks import parse_proxy_url
|
|
import python_socks._errors
|
|
|
|
from searx import logger
|
|
|
|
# Optional uvloop (support Python 3.6)
|
|
try:
|
|
import uvloop
|
|
except ImportError:
|
|
pass
|
|
else:
|
|
uvloop.install()
|
|
|
|
|
|
logger = logger.getChild('searx.http.client')
|
|
LOOP = None
|
|
TRANSPORT_KWARGS = {
|
|
'backend': 'asyncio',
|
|
'trust_env': False,
|
|
}
|
|
|
|
|
|
async def close_connections_for_url(connection_pool: httpcore.AsyncConnectionPool, url: httpcore._utils.URL):
|
|
origin = httpcore._utils.url_to_origin(url)
|
|
logger.debug('Drop connections for %r', origin)
|
|
connections_to_close = connection_pool._connections_for_origin(origin)
|
|
for connection in connections_to_close:
|
|
await connection_pool._remove_from_pool(connection)
|
|
try:
|
|
await connection.aclose()
|
|
except httpcore.NetworkError as e:
|
|
logger.warning('Error closing an existing connection', exc_info=e)
|
|
|
|
|
|
class AsyncHTTPTransportNoHttp(httpcore.AsyncHTTPTransport):
|
|
"""Block HTTP request"""
|
|
|
|
async def arequest(self, method, url, headers=None, stream=None, ext=None):
|
|
raise httpcore.UnsupportedProtocol("HTTP protocol is disabled")
|
|
|
|
|
|
class AsyncProxyTransportFixed(AsyncProxyTransport):
|
|
"""Fix httpx_socks.AsyncProxyTransport
|
|
|
|
Map python_socks exceptions to httpcore.ProxyError
|
|
|
|
Map socket.gaierror to httpcore.ConnectError
|
|
|
|
Note: keepalive_expiry is ignored, AsyncProxyTransport should call:
|
|
* self._keepalive_sweep()
|
|
* self._response_closed(self, connection)
|
|
|
|
Note: AsyncProxyTransport inherit from AsyncConnectionPool
|
|
|
|
Note: the API is going to change on httpx 0.18.0
|
|
see https://github.com/encode/httpx/pull/1522
|
|
"""
|
|
|
|
async def arequest(self, method, url, headers=None, stream=None, ext=None):
|
|
retry = 2
|
|
while retry > 0:
|
|
retry -= 1
|
|
try:
|
|
return await super().arequest(method, url, headers, stream, ext)
|
|
except (python_socks._errors.ProxyConnectionError,
|
|
python_socks._errors.ProxyTimeoutError,
|
|
python_socks._errors.ProxyError) as e:
|
|
raise httpcore.ProxyError(e)
|
|
except OSError as e:
|
|
# socket.gaierror when DNS resolution fails
|
|
raise httpcore.NetworkError(e)
|
|
except httpcore.RemoteProtocolError as e:
|
|
# in case of httpcore.RemoteProtocolError: Server disconnected
|
|
await close_connections_for_url(self, url)
|
|
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
|
|
# retry
|
|
except (httpcore.NetworkError, httpcore.ProtocolError) as e:
|
|
# httpcore.WriteError on HTTP/2 connection leaves a new opened stream
|
|
# then each new request creates a new stream and raise the same WriteError
|
|
await close_connections_for_url(self, url)
|
|
raise e
|
|
|
|
|
|
class AsyncHTTPTransportFixed(httpx.AsyncHTTPTransport):
|
|
"""Fix httpx.AsyncHTTPTransport"""
|
|
|
|
async def arequest(self, method, url, headers=None, stream=None, ext=None):
|
|
retry = 2
|
|
while retry > 0:
|
|
retry -= 1
|
|
try:
|
|
return await super().arequest(method, url, headers, stream, ext)
|
|
except OSError as e:
|
|
# socket.gaierror when DNS resolution fails
|
|
raise httpcore.ConnectError(e)
|
|
except httpcore.CloseError as e:
|
|
# httpcore.CloseError: [Errno 104] Connection reset by peer
|
|
# raised by _keepalive_sweep()
|
|
# from https://github.com/encode/httpcore/blob/4b662b5c42378a61e54d673b4c949420102379f5/httpcore/_backends/asyncio.py#L198 # noqa
|
|
await close_connections_for_url(self._pool, url)
|
|
logger.warning('httpcore.CloseError: retry', exc_info=e)
|
|
# retry
|
|
except httpcore.RemoteProtocolError as e:
|
|
# in case of httpcore.RemoteProtocolError: Server disconnected
|
|
await close_connections_for_url(self._pool, url)
|
|
logger.warning('httpcore.RemoteProtocolError: retry', exc_info=e)
|
|
# retry
|
|
except (httpcore.ProtocolError, httpcore.NetworkError) as e:
|
|
await close_connections_for_url(self._pool, url)
|
|
raise e
|
|
|
|
|
|
def get_transport_for_socks_proxy(verify, http2, local_address, proxy_url, limit, retries):
|
|
global LOOP, TRANSPORT_KWARGS
|
|
# support socks5h (requests compatibility):
|
|
# https://requests.readthedocs.io/en/master/user/advanced/#socks
|
|
# socks5:// hostname is resolved on client side
|
|
# socks5h:// hostname is resolved on proxy side
|
|
rdns = False
|
|
socks5h = 'socks5h://'
|
|
if proxy_url.startswith(socks5h):
|
|
proxy_url = 'socks5://' + proxy_url[len(socks5h):]
|
|
rdns = True
|
|
|
|
proxy_type, proxy_host, proxy_port, proxy_username, proxy_password = parse_proxy_url(proxy_url)
|
|
|
|
return AsyncProxyTransportFixed(proxy_type=proxy_type, proxy_host=proxy_host, proxy_port=proxy_port,
|
|
username=proxy_username, password=proxy_password,
|
|
rdns=rdns,
|
|
loop=LOOP,
|
|
verify=verify,
|
|
http2=http2,
|
|
local_address=local_address,
|
|
max_connections=limit.max_connections,
|
|
max_keepalive_connections=limit.max_keepalive_connections,
|
|
keepalive_expiry=limit.keepalive_expiry,
|
|
retries=retries,
|
|
**TRANSPORT_KWARGS)
|
|
|
|
|
|
def get_transport(verify, http2, local_address, proxy_url, limit, retries):
|
|
return AsyncHTTPTransportFixed(verify=verify,
|
|
http2=http2,
|
|
local_address=local_address,
|
|
proxy=httpx._config.Proxy(proxy_url) if proxy_url else None,
|
|
limits=limit,
|
|
retries=retries,
|
|
**TRANSPORT_KWARGS)
|
|
|
|
|
|
def iter_proxies(proxies):
|
|
# https://www.python-httpx.org/compatibility/#proxy-keys
|
|
if isinstance(proxies, str):
|
|
yield 'all://', proxies
|
|
elif isinstance(proxies, dict):
|
|
for pattern, proxy_url in proxies.items():
|
|
yield pattern, proxy_url
|
|
|
|
|
|
def new_client(enable_http, verify, enable_http2,
|
|
max_connections, max_keepalive_connections, keepalive_expiry,
|
|
proxies, local_address, retries, max_redirects):
|
|
limit = httpx.Limits(max_connections=max_connections,
|
|
max_keepalive_connections=max_keepalive_connections,
|
|
keepalive_expiry=keepalive_expiry)
|
|
# See https://www.python-httpx.org/advanced/#routing
|
|
mounts = {}
|
|
for pattern, proxy_url in iter_proxies(proxies):
|
|
if not enable_http and (pattern == 'http' or pattern.startswith('http://')):
|
|
continue
|
|
if proxy_url.startswith('socks4://') \
|
|
or proxy_url.startswith('socks5://') \
|
|
or proxy_url.startswith('socks5h://'):
|
|
mounts[pattern] = get_transport_for_socks_proxy(verify, enable_http2, local_address, proxy_url, limit,
|
|
retries)
|
|
else:
|
|
mounts[pattern] = get_transport(verify, enable_http2, local_address, proxy_url, limit, retries)
|
|
|
|
if not enable_http:
|
|
mounts['http://'] = AsyncHTTPTransportNoHttp()
|
|
|
|
transport = get_transport(verify, enable_http2, local_address, None, limit, retries)
|
|
return httpx.AsyncClient(transport=transport, mounts=mounts, max_redirects=max_redirects)
|
|
|
|
|
|
def init():
|
|
# log
|
|
for logger_name in ('hpack.hpack', 'hpack.table'):
|
|
logging.getLogger(logger_name).setLevel(logging.WARNING)
|
|
|
|
# loop
|
|
def loop_thread():
|
|
global LOOP
|
|
LOOP = asyncio.new_event_loop()
|
|
LOOP.run_forever()
|
|
|
|
th = threading.Thread(
|
|
target=loop_thread,
|
|
name='asyncio_loop',
|
|
daemon=True,
|
|
)
|
|
th.start()
|
|
|
|
|
|
init()
|