[mod] multithreading only in searx.search.* packages

it prepares the new architecture change,
everything about multithreading in moved in the searx.search.* packages

previously the call to the "init" function of the engines was done in searx.engines:
* the network was not set (request not sent using the defined proxy)
* it requires to monkey patch the code to avoid HTTP requests during the tests
This commit is contained in:
Alexandre Flament 2021-05-05 13:08:54 +02:00
parent d36adfa59f
commit 8c1a65d32f
10 changed files with 85 additions and 65 deletions

View file

@ -167,26 +167,3 @@ def load_engines(engine_list):
if engine is not None: if engine is not None:
engines[engine.name] = engine engines[engine.name] = engine
return engines return engines
def initialize_engines(engine_list):
load_engines(engine_list)
initialize_network(engine_list, settings['outgoing'])
def engine_init(engine_name, init_fn):
try:
set_context_network_name(engine_name)
init_fn(get_engine_from_settings(engine_name))
except SearxEngineResponseException as exc:
logger.warn('%s engine: Fail to initialize // %s', engine_name, exc)
except Exception:
logger.exception('%s engine: Fail to initialize', engine_name)
else:
logger.debug('%s engine: Initialized', engine_name)
for engine_name, engine in engines.items():
if hasattr(engine, 'init'):
init_fn = getattr(engine, 'init')
if init_fn:
logger.debug('%s engine: Starting background initialization', engine_name)
threading.Thread(target=engine_init, args=(engine_name, init_fn)).start()

View file

@ -29,9 +29,11 @@ from searx.results import ResultContainer
from searx import logger from searx import logger
from searx.plugins import plugins from searx.plugins import plugins
from searx.search.models import EngineRef, SearchQuery from searx.search.models import EngineRef, SearchQuery
from searx.search.processors import processors, initialize as initialize_processors from searx.engines import load_engines
from searx.search.checker import initialize as initialize_checker from searx.network import initialize as initialize_network
from searx.metrics import initialize as initialize_metrics, counter_inc, histogram_observe_time from searx.metrics import initialize as initialize_metrics, counter_inc, histogram_observe_time
from searx.search.processors import PROCESSORS, initialize as initialize_processors
from searx.search.checker import initialize as initialize_checker
logger = logger.getChild('search') logger = logger.getChild('search')
@ -50,8 +52,10 @@ else:
def initialize(settings_engines=None, enable_checker=False): def initialize(settings_engines=None, enable_checker=False):
settings_engines = settings_engines or settings['engines'] settings_engines = settings_engines or settings['engines']
initialize_processors(settings_engines) load_engines(settings_engines)
initialize_network(settings_engines, settings['outgoing'])
initialize_metrics([engine['name'] for engine in settings_engines]) initialize_metrics([engine['name'] for engine in settings_engines])
initialize_processors(settings_engines)
if enable_checker: if enable_checker:
initialize_checker() initialize_checker()
@ -106,7 +110,7 @@ class Search:
# start search-reqest for all selected engines # start search-reqest for all selected engines
for engineref in self.search_query.engineref_list: for engineref in self.search_query.engineref_list:
processor = processors[engineref.name] processor = PROCESSORS[engineref.name]
# stop the request now if the engine is suspend # stop the request now if the engine is suspend
if processor.extend_container_if_suspended(self.result_container): if processor.extend_container_if_suspended(self.result_container):
@ -152,7 +156,7 @@ class Search:
for engine_name, query, request_params in requests: for engine_name, query, request_params in requests:
th = threading.Thread( th = threading.Thread(
target=processors[engine_name].search, target=PROCESSORS[engine_name].search,
args=(query, request_params, self.result_container, self.start_time, self.actual_timeout), args=(query, request_params, self.result_container, self.start_time, self.actual_timeout),
name=search_id, name=search_id,
) )

View file

@ -8,7 +8,7 @@ import logging
import searx.search import searx.search
import searx.search.checker import searx.search.checker
from searx.search import processors from searx.search import PROCESSORS
from searx.engines import engine_shortcuts from searx.engines import engine_shortcuts
@ -41,13 +41,13 @@ def iter_processor(engine_name_list):
if len(engine_name_list) > 0: if len(engine_name_list) > 0:
for name in engine_name_list: for name in engine_name_list:
name = engine_shortcuts.get(name, name) name = engine_shortcuts.get(name, name)
processor = processors.get(name) processor = PROCESSORS.get(name)
if processor is not None: if processor is not None:
yield name, processor yield name, processor
else: else:
stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RED}Engine does not exist{RESET_SEQ}') stdout.write(f'{BOLD_SEQ}Engine {name:30}{RESET_SEQ}{RED}Engine does not exist{RESET_SEQ}')
else: else:
for name, processor in searx.search.processors.items(): for name, processor in searx.search.PROCESSORS.items():
yield name, processor yield name, processor

View file

@ -9,7 +9,7 @@ import signal
from searx import logger, settings, searx_debug from searx import logger, settings, searx_debug
from searx.exceptions import SearxSettingsException from searx.exceptions import SearxSettingsException
from searx.search.processors import processors from searx.search.processors import PROCESSORS
from searx.search.checker import Checker from searx.search.checker import Checker
from searx.shared import schedule, storage from searx.shared import schedule, storage
@ -55,7 +55,7 @@ def run():
'status': 'ok', 'status': 'ok',
'engines': {} 'engines': {}
} }
for name, processor in processors.items(): for name, processor in PROCESSORS.items():
logger.debug('Checking %s engine', name) logger.debug('Checking %s engine', name)
checker = Checker(processor) checker = Checker(processor)
checker.run() checker.run()

View file

@ -11,9 +11,11 @@ __all__ = [
'OnlineProcessor', 'OnlineProcessor',
'OnlineDictionaryProcessor', 'OnlineDictionaryProcessor',
'OnlineCurrencyProcessor', 'OnlineCurrencyProcessor',
'processors', 'PROCESSORS',
] ]
import threading
from searx import logger from searx import logger
import searx.engines as engines import searx.engines as engines
@ -24,7 +26,7 @@ from .online_currency import OnlineCurrencyProcessor
from .abstract import EngineProcessor from .abstract import EngineProcessor
logger = logger.getChild('search.processors') logger = logger.getChild('search.processors')
processors = {} PROCESSORS = {}
"""Cache request processores, stored by *engine-name* (:py:func:`initialize`)""" """Cache request processores, stored by *engine-name* (:py:func:`initialize`)"""
def get_processor_class(engine_type): def get_processor_class(engine_type):
@ -34,6 +36,7 @@ def get_processor_class(engine_type):
return c return c
return None return None
def get_processor(engine, engine_name): def get_processor(engine, engine_name):
"""Return processor instance that fits to ``engine.engine.type``)""" """Return processor instance that fits to ``engine.engine.type``)"""
engine_type = getattr(engine, 'engine_type', 'online') engine_type = getattr(engine, 'engine_type', 'online')
@ -42,12 +45,26 @@ def get_processor(engine, engine_name):
return processor_class(engine, engine_name) return processor_class(engine, engine_name)
return None return None
def initialize_processor(processor):
"""Initialize one processor
Call the init function of the engine
"""
if processor.has_initialize_function:
t = threading.Thread(target=processor.initialize, daemon=True)
t.start()
def initialize(engine_list): def initialize(engine_list):
"""Initialize all engines and store a processor for each engine in :py:obj:`processors`.""" """Initialize all engines and store a processor for each engine in :py:obj:`PROCESSORS`."""
engines.initialize_engines(engine_list) for engine_data in engine_list:
for engine_name, engine in engines.engines.items(): engine_name = engine_data['name']
processor = get_processor(engine, engine_name) engine = engines.engines.get(engine_name)
if processor is None: if engine:
logger.error('Error get processor for engine %s', engine_name) processor = get_processor(engine, engine_name)
else: initialize_processor(processor)
processors[engine_name] = processor if processor is None:
logger.error('Error get processor for engine %s', engine_name)
else:
PROCESSORS[engine_name] = processor

View file

@ -13,7 +13,8 @@ from searx import logger
from searx.engines import settings from searx.engines import settings
from searx.network import get_time_for_thread, get_network from searx.network import get_time_for_thread, get_network
from searx.metrics import histogram_observe, counter_inc, count_exception, count_error from searx.metrics import histogram_observe, counter_inc, count_exception, count_error
from searx.exceptions import SearxEngineAccessDeniedException from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException
from searx.utils import get_engine_from_settings
logger = logger.getChild('searx.search.processor') logger = logger.getChild('searx.search.processor')
SUSPENDED_STATUS = {} SUSPENDED_STATUS = {}
@ -66,6 +67,20 @@ class EngineProcessor(ABC):
key = id(key) if key else self.engine_name key = id(key) if key else self.engine_name
self.suspended_status = SUSPENDED_STATUS.setdefault(key, SuspendedStatus()) self.suspended_status = SUSPENDED_STATUS.setdefault(key, SuspendedStatus())
def initialize(self):
try:
self.engine.init(get_engine_from_settings(self.engine_name))
except SearxEngineResponseException as exc:
logger.warn('%s engine: Fail to initialize // %s', self.engine_name, exc)
except Exception: # pylint: disable=broad-except
logger.exception('%s engine: Fail to initialize', self.engine_name)
else:
logger.debug('%s engine: Initialized', self.engine_name)
@property
def has_initialize_function(self):
return hasattr(self.engine, 'init')
def handle_exception(self, result_container, exception_or_message, suspend=False): def handle_exception(self, result_container, exception_or_message, suspend=False):
# update result_container # update result_container
if isinstance(exception_or_message, BaseException): if isinstance(exception_or_message, BaseException):

View file

@ -5,7 +5,7 @@
""" """
from time import time from timeit import default_timer
import asyncio import asyncio
import httpx import httpx
@ -40,6 +40,15 @@ class OnlineProcessor(EngineProcessor):
engine_type = 'online' engine_type = 'online'
def initialize(self):
# set timeout for all HTTP requests
searx.network.set_timeout_for_thread(self.engine.timeout, start_time=default_timer())
# reset the HTTP total time
searx.network.reset_time_for_thread()
# set the network
searx.network.set_context_network_name(self.engine_name)
super().initialize()
def get_params(self, search_query, engine_category): def get_params(self, search_query, engine_category):
params = super().get_params(search_query, engine_category) params = super().get_params(search_query, engine_category)
if params is None: if params is None:
@ -139,7 +148,7 @@ class OnlineProcessor(EngineProcessor):
self.handle_exception(result_container, e, suspend=True) self.handle_exception(result_container, e, suspend=True)
logger.error("engine {0} : HTTP requests timeout" logger.error("engine {0} : HTTP requests timeout"
"(search duration : {1} s, timeout: {2} s) : {3}" "(search duration : {1} s, timeout: {2} s) : {3}"
.format(self.engine_name, time() - start_time, .format(self.engine_name, default_timer() - start_time,
timeout_limit, timeout_limit,
e.__class__.__name__)) e.__class__.__name__))
except (httpx.HTTPError, httpx.StreamError) as e: except (httpx.HTTPError, httpx.StreamError) as e:
@ -147,7 +156,7 @@ class OnlineProcessor(EngineProcessor):
self.handle_exception(result_container, e, suspend=True) self.handle_exception(result_container, e, suspend=True)
logger.exception("engine {0} : requests exception" logger.exception("engine {0} : requests exception"
"(search duration : {1} s, timeout: {2} s) : {3}" "(search duration : {1} s, timeout: {2} s) : {3}"
.format(self.engine_name, time() - start_time, .format(self.engine_name, default_timer() - start_time,
timeout_limit, timeout_limit,
e)) e))
except SearxEngineCaptchaException as e: except SearxEngineCaptchaException as e:

View file

@ -23,7 +23,7 @@ class TestEnginesInit(SearxTestCase):
engine_list = [{'engine': 'dummy', 'name': 'engine1', 'shortcut': 'e1', 'categories': 'general'}, engine_list = [{'engine': 'dummy', 'name': 'engine1', 'shortcut': 'e1', 'categories': 'general'},
{'engine': 'dummy', 'name': 'engine2', 'shortcut': 'e2', 'categories': 'onions'}] {'engine': 'dummy', 'name': 'engine2', 'shortcut': 'e2', 'categories': 'onions'}]
engines.initialize_engines(engine_list) engines.load_engines(engine_list)
self.assertEqual(len(engines.engines), 1) self.assertEqual(len(engines.engines), 1)
self.assertIn('engine1', engines.engines) self.assertIn('engine1', engines.engines)
self.assertNotIn('onions', engines.categories) self.assertNotIn('onions', engines.categories)
@ -35,7 +35,7 @@ class TestEnginesInit(SearxTestCase):
'timeout': 20.0, 'onion_url': 'http://engine1.onion'}, 'timeout': 20.0, 'onion_url': 'http://engine1.onion'},
{'engine': 'dummy', 'name': 'engine2', 'shortcut': 'e2', 'categories': 'onions'}] {'engine': 'dummy', 'name': 'engine2', 'shortcut': 'e2', 'categories': 'onions'}]
engines.initialize_engines(engine_list) engines.load_engines(engine_list)
self.assertEqual(len(engines.engines), 2) self.assertEqual(len(engines.engines), 2)
self.assertIn('engine1', engines.engines) self.assertIn('engine1', engines.engines)
self.assertIn('engine2', engines.engines) self.assertIn('engine2', engines.engines)

View file

@ -1,11 +1,8 @@
from mock import patch from searx import settings
from searx.engines import load_engines
from searx.search import initialize
from searx.query import RawTextQuery from searx.query import RawTextQuery
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
import searx.engines
TEST_ENGINES = [ TEST_ENGINES = [
{ {
@ -241,7 +238,7 @@ class TestBang(SearxTestCase):
THE_QUERY = 'the query' THE_QUERY = 'the query'
def test_bang(self): def test_bang(self):
initialize(TEST_ENGINES) load_engines(TEST_ENGINES)
for bang in TestBang.SPECIFIC_BANGS + TestBang.NOT_SPECIFIC_BANGS: for bang in TestBang.SPECIFIC_BANGS + TestBang.NOT_SPECIFIC_BANGS:
with self.subTest(msg="Check bang", bang=bang): with self.subTest(msg="Check bang", bang=bang):
@ -267,12 +264,12 @@ class TestBang(SearxTestCase):
self.assertFalse(query.specific) self.assertFalse(query.specific)
def test_bang_not_found(self): def test_bang_not_found(self):
initialize(TEST_ENGINES) load_engines(TEST_ENGINES)
query = RawTextQuery('the query !bang_not_found', []) query = RawTextQuery('the query !bang_not_found', [])
self.assertEqual(query.getFullQuery(), 'the query !bang_not_found') self.assertEqual(query.getFullQuery(), 'the query !bang_not_found')
def test_bang_autocomplete(self): def test_bang_autocomplete(self):
initialize(TEST_ENGINES) load_engines(TEST_ENGINES)
query = RawTextQuery('the query !dum', []) query = RawTextQuery('the query !dum', [])
self.assertEqual(query.autocomplete_list, ['!dummy_engine']) self.assertEqual(query.autocomplete_list, ['!dummy_engine'])
@ -281,10 +278,9 @@ class TestBang(SearxTestCase):
self.assertEqual(query.getQuery(), '!dum the query') self.assertEqual(query.getQuery(), '!dum the query')
def test_bang_autocomplete_empty(self): def test_bang_autocomplete_empty(self):
with patch.object(searx.engines, 'initialize_engines', searx.engines.load_engines): load_engines(settings['engines'])
initialize() query = RawTextQuery('the query !', [])
query = RawTextQuery('the query !', []) self.assertEqual(query.autocomplete_list, ['!images', '!wikipedia', '!osm'])
self.assertEqual(query.autocomplete_list, ['!images', '!wikipedia', '!osm'])
query = RawTextQuery('the query ?', ['osm']) query = RawTextQuery('the query ?', ['osm'])
self.assertEqual(query.autocomplete_list, ['?images', '?wikipedia']) self.assertEqual(query.autocomplete_list, ['?images', '?wikipedia'])

View file

@ -5,14 +5,16 @@ from urllib.parse import ParseResult
from mock import Mock from mock import Mock
from searx.testing import SearxTestCase from searx.testing import SearxTestCase
from searx.search import Search from searx.search import Search
import searx.engines import searx.search.processors
class ViewsTestCase(SearxTestCase): class ViewsTestCase(SearxTestCase):
def setUp(self): def setUp(self):
# skip init function (no external HTTP request) # skip init function (no external HTTP request)
self.setattr4test(searx.engines, 'initialize_engines', searx.engines.load_engines) def dummy(*args, **kwargs):
pass
self.setattr4test(searx.search.processors, 'initialize_processor', dummy)
from searx import webapp # pylint disable=import-outside-toplevel from searx import webapp # pylint disable=import-outside-toplevel