Simplify search.py, basically updated PR #518

The timeouts in settings.yml is about the total time (not only the HTTP request but also the prepare the request and parsing the response)
It was more or less the case before since the threaded_requests function ignores the thread after the timeout even the HTTP request is ended.

New / changed stats :
* page_load_time : record the HTTP request time
* page_load_count: the number of HTTP request
* engine_time : the execution total time of an engine
* engine_time_count : the number of "engine_time" measure

The avg response times in the preferences are the engine response time (engine_load_time / engine_load_count)

To sum up :
* Search.search() filters the engines that can't process the request
* Search.search() call search_multiple_requests function
* search_multiple_requests creates one thread per engine, each thread runs the search_one_request function
* search_one_request calls the request function, make the HTTP request, calls the response function, extends the result_container
* search_multiple_requests waits for the the thread to finish (or timeout)
This commit is contained in:
Alexandre Flament 2016-11-05 13:45:20 +01:00
parent 51eafdd471
commit 01e2648e93
3 changed files with 153 additions and 131 deletions

View file

@ -99,6 +99,9 @@ def load_engine(engine_data):
'result_count': 0,
'search_count': 0,
'page_load_time': 0,
'page_load_count': 0,
'engine_time': 0,
'engine_time_count': 0,
'score_count': 0,
'errors': 0
}
@ -115,32 +118,56 @@ def load_engine(engine_data):
return engine
def to_percentage(stats, maxvalue):
for engine_stat in stats:
if maxvalue:
engine_stat['percentage'] = int(engine_stat['avg'] / maxvalue * 100)
else:
engine_stat['percentage'] = 0
return stats
def get_engines_stats():
# TODO refactor
pageloads = []
engine_times = []
results = []
scores = []
errors = []
scores_per_result = []
max_pageload = max_results = max_score = max_errors = max_score_per_result = 0 # noqa
max_pageload = max_engine_times = max_results = max_score = max_errors = max_score_per_result = 0 # noqa
for engine in engines.values():
if engine.stats['search_count'] == 0:
continue
results_num = \
engine.stats['result_count'] / float(engine.stats['search_count'])
load_times = engine.stats['page_load_time'] / float(engine.stats['search_count']) # noqa
if engine.stats['page_load_count'] != 0:
load_times = engine.stats['page_load_time'] / float(engine.stats['page_load_count']) # noqa
else:
load_times = 0
if engine.stats['engine_time_count'] != 0:
this_engine_time = engine.stats['engine_time'] / float(engine.stats['engine_time_count']) # noqa
else:
this_engine_time = 0
if results_num:
score = engine.stats['score_count'] / float(engine.stats['search_count']) # noqa
score_per_result = score / results_num
else:
score = score_per_result = 0.0
max_results = max(results_num, max_results)
max_pageload = max(load_times, max_pageload)
max_engine_times = max(this_engine_time, max_engine_times)
max_results = max(results_num, max_results)
max_score = max(score, max_score)
max_score_per_result = max(score_per_result, max_score_per_result)
max_errors = max(max_errors, engine.stats['errors'])
pageloads.append({'avg': load_times, 'name': engine.name})
engine_times.append({'avg': this_engine_time, 'name': engine.name})
results.append({'avg': results_num, 'name': engine.name})
scores.append({'avg': score, 'name': engine.name})
errors.append({'avg': engine.stats['errors'], 'name': engine.name})
@ -149,38 +176,18 @@ def get_engines_stats():
'name': engine.name
})
for engine in pageloads:
if max_pageload:
engine['percentage'] = int(engine['avg'] / max_pageload * 100)
else:
engine['percentage'] = 0
for engine in results:
if max_results:
engine['percentage'] = int(engine['avg'] / max_results * 100)
else:
engine['percentage'] = 0
for engine in scores:
if max_score:
engine['percentage'] = int(engine['avg'] / max_score * 100)
else:
engine['percentage'] = 0
for engine in scores_per_result:
if max_score_per_result:
engine['percentage'] = int(engine['avg']
/ max_score_per_result * 100)
else:
engine['percentage'] = 0
for engine in errors:
if max_errors:
engine['percentage'] = int(float(engine['avg']) / max_errors * 100)
else:
engine['percentage'] = 0
pageloads = to_percentage(pageloads, max_pageload)
engine_times = to_percentage(engine_times, max_engine_times)
results = to_percentage(results, max_results)
scores = to_percentage(scores, max_score)
scores_per_result = to_percentage(scores_per_result, max_score_per_result)
erros = to_percentage(errors, max_errors)
return [
(
gettext('Engine time (sec)'),
sorted(engine_times, key=itemgetter('avg'))
),
(
gettext('Page loads (sec)'),
sorted(pageloads, key=itemgetter('avg'))

View file

@ -35,14 +35,53 @@ logger = logger.getChild('search')
number_of_searches = 0
def search_request_wrapper(fn, url, engine_name, **kwargs):
ret = None
engine = engines[engine_name]
def send_http_request(engine, request_params, timeout_limit):
response = None
try:
ret = fn(url, **kwargs)
# create dictionary which contain all
# informations about the request
request_args = dict(
headers=request_params['headers'],
cookies=request_params['cookies'],
timeout=timeout_limit,
verify=request_params['verify']
)
# specific type of request (GET or POST)
if request_params['method'] == 'GET':
req = requests_lib.get
else:
req = requests_lib.post
request_args['data'] = request_params['data']
# for page_load_time stats
time_before_request = time()
# send the request
response = req(request_params['url'], **request_args)
with threading.RLock():
# no error : reset the suspend variables
engine.continuous_errors = 0
engine.suspend_end_time = 0
# update stats with current page-load-time
# only the HTTP request
engine.stats['page_load_time'] += time() - time_before_request
engine.stats['page_load_count'] += 1
# is there a timeout (no parsing in this case)
timeout_overhead = 0.2 # seconds
search_duration = time() - request_params['started']
if search_duration > timeout_limit + timeout_overhead:
logger.exception('engine timeout on HTTP request:'
'{0} (search duration : {1} ms, time-out: {2} )'
.format(engine.name, search_duration, timeout_limit))
with threading.RLock():
engine.stats['errors'] += 1
return False
# everything is ok : return the response
return response
except:
# increase errors stats
with threading.RLock():
@ -51,20 +90,62 @@ def search_request_wrapper(fn, url, engine_name, **kwargs):
engine.suspend_end_time = time() + min(60, engine.continuous_errors)
# print engine name and specific error message
logger.exception('engine crash: {0}'.format(engine_name))
return ret
logger.exception('engine crash: {0}'.format(engine.name))
return False
def threaded_requests(requests):
timeout_limit = max(r[2]['timeout'] for r in requests)
search_start = time()
def search_one_request(engine_name, query, request_params, result_container, timeout_limit):
engine = engines[engine_name]
# update request parameters dependent on
# search-engine (contained in engines folder)
engine.request(query, request_params)
# TODO add support of offline engines
if request_params['url'] is None:
return False
# ignoring empty urls
if not request_params['url']:
return False
# send request
response = send_http_request(engine, request_params, timeout_limit)
# parse response
success = None
if response:
# parse the response
response.search_params = request_params
search_results = engine.response(response)
# add results
for result in search_results:
result['engine'] = engine.name
result_container.extend(engine.name, search_results)
success = True
else:
success = False
with threading.RLock():
# update stats : total time
engine.stats['engine_time'] += time() - request_params['started']
engine.stats['engine_time_count'] += 1
#
return success
def search_multiple_requests(requests, result_container, timeout_limit):
start_time = time()
search_id = uuid4().__str__()
for fn, url, request_args, engine_name in requests:
request_args['timeout'] = timeout_limit
for engine_name, query, request_params in requests:
th = threading.Thread(
target=search_request_wrapper,
args=(fn, url, engine_name),
kwargs=request_args,
target=search_one_request,
args=(engine_name, query, request_params, result_container, timeout_limit),
name=search_id,
)
th._engine_name = engine_name
@ -72,7 +153,7 @@ def threaded_requests(requests):
for th in threading.enumerate():
if th.name == search_id:
remaining_time = max(0.0, timeout_limit - (time() - search_start))
remaining_time = max(0.0, timeout_limit - (time() - start_time))
th.join(remaining_time)
if th.isAlive():
logger.warning('engine timeout: {0}'.format(th._engine_name))
@ -90,44 +171,6 @@ def default_request_params():
}
# create a callback wrapper for the search engine results
def make_callback(engine_name, callback, params, result_container):
# creating a callback wrapper for the search engine results
def process_callback(response, **kwargs):
# check if redirect comparing to the True value,
# because resp can be a Mock object, and any attribut name returns something.
if response.is_redirect is True:
logger.debug('{0} redirect on: {1}'.format(engine_name, response))
return
response.search_params = params
search_duration = time() - params['started']
# update stats with current page-load-time
with threading.RLock():
engines[engine_name].stats['page_load_time'] += search_duration
timeout_overhead = 0.2 # seconds
timeout_limit = engines[engine_name].timeout + timeout_overhead
if search_duration > timeout_limit:
with threading.RLock():
engines[engine_name].stats['errors'] += 1
return
# callback
search_results = callback(response)
# add results
for result in search_results:
result['engine'] = engine_name
result_container.extend(engine_name, search_results)
return process_callback
def get_search_query_from_webapp(preferences, form):
query = None
query_engines = []
@ -254,6 +297,9 @@ class Search(object):
def search(self):
global number_of_searches
# start time
start_time = time()
# init vars
requests = []
@ -266,6 +312,9 @@ class Search(object):
search_query = self.search_query
# max of all selected engine timeout
timeout_limit = 0
# start search-reqest for all selected engines
for selected_engine in search_query.engines:
if selected_engine['name'] not in engines:
@ -294,7 +343,7 @@ class Search(object):
request_params = default_request_params()
request_params['headers']['User-Agent'] = user_agent
request_params['category'] = selected_engine['category']
request_params['started'] = time()
request_params['started'] = start_time
request_params['pageno'] = search_query.pageno
if hasattr(engine, 'language') and engine.language:
@ -306,52 +355,16 @@ class Search(object):
request_params['safesearch'] = search_query.safesearch
request_params['time_range'] = search_query.time_range
# update request parameters dependent on
# search-engine (contained in engines folder)
engine.request(search_query.query.encode('utf-8'), request_params)
if request_params['url'] is None:
# TODO add support of offline engines
pass
# create a callback wrapper for the search engine results
callback = make_callback(
selected_engine['name'],
engine.response,
request_params,
self.result_container)
# create dictionary which contain all
# informations about the request
request_args = dict(
headers=request_params['headers'],
hooks=dict(response=callback),
cookies=request_params['cookies'],
timeout=engine.timeout,
verify=request_params['verify']
)
# specific type of request (GET or POST)
if request_params['method'] == 'GET':
req = requests_lib.get
else:
req = requests_lib.post
request_args['data'] = request_params['data']
# ignoring empty urls
if not request_params['url']:
continue
# append request to list
requests.append((req, request_params['url'],
request_args,
selected_engine['name']))
requests.append((selected_engine['name'], search_query.query.encode('utf-8'), request_params))
if not requests:
return self.result_container
# send all search-request
threaded_requests(requests)
start_new_thread(gc.collect, tuple())
# update timeout_limit
timeout_limit = max(timeout_limit, engine.timeout)
if requests:
# send all search-request
search_multiple_requests(requests, self.result_container, timeout_limit - (time() - start_time))
start_new_thread(gc.collect, tuple())
# return results, suggestions, answers and infoboxes
return self.result_container

View file

@ -593,6 +593,8 @@ def preferences():
if e.timeout > settings['outgoing']['request_timeout']:
stats[e.name]['warn_timeout'] = True
# get first element [0], the engine time,
# and then the second element [1] : the time (the first one is the label)
for engine_stat in get_engines_stats()[0][1]:
stats[engine_stat.get('name')]['time'] = round(engine_stat.get('avg'), 3)
if engine_stat.get('avg') > settings['outgoing']['request_timeout']: