|
|
/*++
Copyright (c) 1998 Microsoft Corporation
Module Name:
centctrl.cpp
Abstract:
SIS Groveler central controller
Authors:
John Douceur, 1998
Environment:
User Mode
Revision History:
--*/
#include "all.hxx"
const _TCHAR total_processor_time_path[] = _T("\\Processor(_Total#0)\\% Processor Time");
CentralController::CentralController( int num_partitions, Groveler *grovelers, GrovelStatus *groveler_statuses, ReadParameters *read_parameters, WriteParameters *write_parameters, ReadDiskInformation **read_disk_info, WriteDiskInformation **write_disk_info, int *num_excluded_paths, const _TCHAR ***excluded_paths) : hash_match_ratio_filter(read_parameters->sis_efficacy_history_size, write_parameters->hash_match_ratio), compare_match_ratio_filter(read_parameters->sis_efficacy_history_size, write_parameters->compare_match_ratio), dequeue_hash_ratio_filter(read_parameters->log_winnow_history_size, write_parameters->dequeue_hash_ratio) { ASSERT(this != 0); unsigned int current_time = GET_TICK_COUNT(); this->num_partitions = num_partitions; ASSERT(num_partitions > 0); this->grovelers = grovelers; ASSERT(grovelers != 0); base_grovel_interval = read_parameters->base_grovel_interval; ASSERT(base_grovel_interval > 0); max_grovel_interval = read_parameters->max_grovel_interval; ASSERT(max_grovel_interval > 0); ASSERT(max_grovel_interval >= base_grovel_interval); max_response_lag = read_parameters->max_response_lag; ASSERT(max_response_lag > 0); working_grovel_interval = read_parameters->working_grovel_interval; ASSERT(working_grovel_interval > 0); ASSERT(base_grovel_interval >= working_grovel_interval); grovel_duration = read_parameters->grovel_duration; ASSERT(grovel_duration > 0); ASSERT(grovel_duration <= base_grovel_interval); ASSERT(grovel_duration <= max_grovel_interval); ASSERT(grovel_duration <= working_grovel_interval); hash_match_ratio = &write_parameters->hash_match_ratio; ASSERT(hash_match_ratio != 0); ASSERT(*hash_match_ratio >= 0.0); ASSERT(*hash_match_ratio <= 1.0); compare_match_ratio = &write_parameters->compare_match_ratio; ASSERT(compare_match_ratio != 0); ASSERT(*compare_match_ratio >= 0.0); ASSERT(*compare_match_ratio <= 1.0); dequeue_hash_ratio = &write_parameters->dequeue_hash_ratio; ASSERT(dequeue_hash_ratio != 0); ASSERT(*dequeue_hash_ratio >= 0.0); ASSERT(*dequeue_hash_ratio <= 1.0); foreground_partition_index = 0; num_alive_partitions = 0; bool some_groveler_dead = false;
//
// Create the control structure for each partion
//
part_controllers = new PartitionController*[num_partitions]; for (int index = 0; index < num_partitions; index++) { part_controllers[index] = new PartitionController( &grovelers[index], groveler_statuses[index], read_parameters->target_entries_per_extraction, read_parameters->max_extraction_interval, base_grovel_interval, max_grovel_interval, read_parameters->low_confidence_grovel_interval, read_parameters->low_disk_space_grovel_interval, read_parameters->partition_info_update_interval, read_parameters->base_restart_extraction_interval, read_parameters->max_restart_extraction_interval, read_parameters->base_restart_groveling_interval, read_parameters->max_restart_groveling_interval, read_parameters->base_regrovel_interval, read_parameters->max_regrovel_interval, read_parameters->volscan_regrovel_threshold, read_parameters->partition_balance_time_constant, read_parameters->read_time_increase_history_size, read_parameters->read_time_decrease_history_size, read_parameters->file_size_history_size, read_disk_info[index]->error_retry_log_extraction, read_disk_info[index]->error_retry_groveling, read_disk_info[index]->base_usn_log_size, read_disk_info[index]->max_usn_log_size, read_parameters->sample_group_size, read_parameters->acceptance_p_value, read_parameters->rejection_p_value, read_parameters->base_use_multiplier, read_parameters->max_use_multiplier, read_parameters->peak_finder_accuracy, read_parameters->peak_finder_range, read_parameters->base_cpu_load_threshold, read_parameters->max_cpu_load_threshold, hash_match_ratio, compare_match_ratio, dequeue_hash_ratio, &write_disk_info[index]->partition_hash_read_time_estimate, &write_disk_info[index]->partition_compare_read_time_estimate, &write_disk_info[index]->mean_file_size, &write_disk_info[index]->read_time_confidence, &write_disk_info[index]->volume_serial_number, index, read_parameters->read_report_discard_threshold, read_disk_info[index]->min_file_size, read_disk_info[index]->min_file_age, read_disk_info[index]->allow_compressed_files, read_disk_info[index]->allow_encrypted_files, read_disk_info[index]->allow_hidden_files, read_disk_info[index]->allow_offline_files, read_disk_info[index]->allow_temporary_files, num_excluded_paths[index], excluded_paths[index]); if (groveler_statuses[index] != Grovel_disable) { if (groveler_statuses[index] == Grovel_ok || groveler_statuses[index] == Grovel_new || read_disk_info[index]->error_retry_groveling) { num_alive_partitions++; } else { ASSERT(groveler_statuses[index] == Grovel_error); some_groveler_dead = true; } } } if (num_alive_partitions == 0) { if (some_groveler_dead) { eventlog.report_event(GROVMSG_ALL_GROVELERS_DEAD, 0); } else { eventlog.report_event(GROVMSG_ALL_GROVELERS_DISABLED, 0); } return; }
cpu_load_determinable = false; PDH_STATUS status = PdhOpenQuery(0, 0, &qhandle); if (status == ERROR_SUCCESS) { ASSERT(qhandle != 0); status = PdhAddCounter(qhandle, total_processor_time_path, 0, &ctr_handle); if (status == ERROR_SUCCESS) { ASSERT(ctr_handle != 0); cpu_load_determinable = true; get_cpu_load(); } else { PRINT_DEBUG_MSG( (_T("GROVELER: PdhAddCounter returned error. PDH_STATUS = %lx\n"), status)); } } else { PRINT_DEBUG_MSG((_T("GROVELER: PdhOpenQuery returned error. PDH_STATUS = %lx\n"), status)); } last_invokation_time = current_time; event_timer.schedule(current_time + base_grovel_interval, (void *)this, control_groveling); }
CentralController::~CentralController() { ASSERT(this != 0); ASSERT(num_partitions > 0); if (cpu_load_determinable) { ASSERT(qhandle != 0); PDH_STATUS status = PdhCloseQuery(qhandle); if (status != ERROR_SUCCESS) { PRINT_DEBUG_MSG( (_T("GROVELER: PdhCloseQuery returned error. PDH_STATUS = %lx\n"), status)); } qhandle = 0; } for (int index = 0; index < num_partitions; index++) { ASSERT(part_controllers[index] != 0); delete part_controllers[index]; part_controllers[index] = 0; } ASSERT(part_controllers != 0); delete[] part_controllers; part_controllers = 0; }
bool CentralController::any_grovelers_alive() { ASSERT(this != 0); return num_alive_partitions > 0; }
void CentralController::demarcate_foreground_batch( int partition_index) { ASSERT(this != 0); ASSERT(partition_index >= 0); ASSERT(partition_index < num_partitions); part_controllers[partition_index]->demarcate_foreground_batch(); }
void CentralController::command_full_volume_scan( int partition_index) { ASSERT(this != 0); ASSERT(partition_index >= 0); ASSERT(partition_index < num_partitions); part_controllers[partition_index]->command_full_volume_scan(); }
void CentralController::control_groveling( void *context) { ASSERT(context != 0); unsigned int invokation_time = GET_TICK_COUNT(); TRACE_PRINTF(TC_centctrl, 3, (_T("time: %d\n"), invokation_time)); TRACE_PRINTF(TC_centctrl, 3, (_T("\tCCcg -\tcontrolling groveling\n"))); CentralController *me = (CentralController *)context; ASSERT(me->num_partitions > 0); ASSERT(me->num_alive_partitions > 0); if (SERVICE_GROVELING_PAUSED() || SERVICE_FOREGROUND_GROVELING()) { TRACE_PRINTF(TC_centctrl, 1, (_T("\tCCcg -\tsuspending controller\n"))); SERVICE_SUSPENDING_CONTROLLER(); return; // return without scheduling another control_groveling()
} int partition_index = -1; double top_priority = DBL_MAX; unsigned int time_delta = invokation_time - me->last_invokation_time; ASSERT(signed(time_delta) >= 0); for (int index = 0; index < me->num_partitions; index++) { me->part_controllers[index]->advance(time_delta); if (me->part_controllers[index]->wait() == 0) { double partition_priority = me->part_controllers[index]->priority(); if (partition_priority < top_priority) { partition_index = index; top_priority = partition_priority; } } } if (partition_index >= 0) { ASSERT(partition_index < me->num_partitions); TRACE_PRINTF(TC_centctrl, 5, (_T("\tCCcg -\tgroveling partition %d\n"), partition_index)); me->grovel_partition(partition_index); } else { TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCcg -\tno partition controllers ready to grovel.\n"))); } int grovel_interval = me->max_response_lag; ASSERT(grovel_interval > 0); for (index = 0; index < me->num_partitions; index++) { int wait = me->part_controllers[index]->wait(); if (wait < grovel_interval) { grovel_interval = wait; } } if (grovel_interval < me->working_grovel_interval) { grovel_interval = me->working_grovel_interval; } TRACE_PRINTF(TC_centctrl, 5, (_T("\tCCcg -\tgrovel interval = %d\n"), grovel_interval)); me->last_invokation_time = invokation_time; ASSERT(grovel_interval > 0); event_timer.schedule(invokation_time + grovel_interval, context, control_groveling); }
void CentralController::exhort_groveling( void *context) { ASSERT(context != 0); unsigned int invokation_time = GET_TICK_COUNT(); TRACE_PRINTF(TC_centctrl, 3, (_T("time: %d\n"), invokation_time)); TRACE_PRINTF(TC_centctrl, 3, (_T("\tCCcg -\texhorting groveling\n"))); CentralController *me = (CentralController *)context; ASSERT(me->num_partitions > 0); ASSERT(me->num_alive_partitions > 0); if (SERVICE_GROVELING_PAUSED() || !SERVICE_FOREGROUND_GROVELING()) { TRACE_PRINTF(TC_centctrl, 1, (_T("\tCCcg -\tsuspending exhorter\n"))); SERVICE_SUSPENDING_EXHORTER(); return; // return without scheduling another exhort_groveling()
} for (int index = 0; index < me->num_partitions; index++) { me->foreground_partition_index = (me->foreground_partition_index + 1) % me->num_partitions; if (SERVICE_PARTITION_IN_FOREGROUND(me->foreground_partition_index)) { break; } } ASSERT(me->foreground_partition_index >= 0); ASSERT(me->foreground_partition_index < me->num_partitions); ASSERT(SERVICE_PARTITION_IN_FOREGROUND(me->foreground_partition_index)); TRACE_PRINTF(TC_centctrl, 5, (_T("\tCCcg -\tgroveling partition %d\n"), me->foreground_partition_index)); me->grovel_partition(me->foreground_partition_index); event_timer.schedule(invokation_time + me->working_grovel_interval, context, exhort_groveling); }
double CentralController::get_cpu_load() { ASSERT(this != 0); if (!cpu_load_determinable) { return 0.0; } ASSERT(qhandle != 0); ASSERT(ctr_handle != 0); PDH_STATUS status = PdhCollectQueryData(qhandle); if (status != ERROR_SUCCESS) { PRINT_DEBUG_MSG( (_T("GROVELER: PdhCollectQueryData returned error. PDH_STATUS = %lx\n"), status)); return 0.0; } PDH_FMT_COUNTERVALUE pdh_value; status = PdhGetFormattedCounterValue(ctr_handle, PDH_FMT_LONG, 0, &pdh_value); if (status != ERROR_SUCCESS) { PRINT_DEBUG_MSG( (_T("GROVELER: PdhGetFormattedCounterValue returned error. PDH_STATUS = %lx\n"), status)); return 0.0; } if (pdh_value.CStatus != ERROR_SUCCESS) { PRINT_DEBUG_MSG((_T("GROVELER: PDH_FMT_COUNTERVALUE::CStatus = %lx\n"), status)); return 0.0; } double cpu_load = double(pdh_value.longValue) / 100.0; if (cpu_load < 0.0) { cpu_load = 0.0; } if (cpu_load > 1.0) { cpu_load = 1.0; } return cpu_load; }
void CentralController::grovel_partition( int partition_index) { ASSERT(this != 0); DWORD count_of_files_hashed; DWORDLONG bytes_of_files_hashed; DWORD count_of_files_matching; DWORDLONG bytes_of_files_matching; DWORD count_of_files_compared; DWORDLONG bytes_of_files_compared; DWORD count_of_files_merged; DWORDLONG bytes_of_files_merged; DWORD count_of_files_enqueued; DWORD count_of_files_dequeued; double cpu_load = get_cpu_load(); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tcpu load = %f\n"), cpu_load)); ASSERT(cpu_load >= 0.0); ASSERT(cpu_load <= 1.0); bool ok = part_controllers[partition_index]-> control_operation(grovel_duration, &count_of_files_hashed, &bytes_of_files_hashed, &count_of_files_matching, &bytes_of_files_matching, &count_of_files_compared, &bytes_of_files_compared, &count_of_files_merged, &bytes_of_files_merged, &count_of_files_enqueued, &count_of_files_dequeued, cpu_load); if (ok) { ASSERT(bytes_of_files_hashed >= count_of_files_hashed); ASSERT(bytes_of_files_matching >= count_of_files_matching); ASSERT(bytes_of_files_compared >= count_of_files_compared); ASSERT(bytes_of_files_merged >= count_of_files_merged); ASSERT(count_of_files_hashed >= count_of_files_matching); ASSERT(bytes_of_files_hashed >= bytes_of_files_matching); ASSERT(count_of_files_compared >= count_of_files_merged); ASSERT(bytes_of_files_compared >= bytes_of_files_merged); ASSERT(count_of_files_dequeued >= count_of_files_hashed); TRACE_PRINTF(TC_centctrl, 3, (_T("\tCCgp -\tpartition %d groveled.\n"), partition_index)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tfiles hashed = %d\n"), count_of_files_hashed)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tbytes hashed = %I64d\n"), bytes_of_files_hashed)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tfiles matching = %d\n"), count_of_files_matching)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tbytes matching = %I64d\n"), bytes_of_files_matching)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tfiles compared = %d\n"), count_of_files_matching)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tbytes compared = %I64d\n"), bytes_of_files_matching)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tfiles merged = %d\n"), count_of_files_merged)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tbytes merged = %I64d\n"), bytes_of_files_merged)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tfiles enqueued = %d\n"), count_of_files_enqueued)); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tfiles dequeued = %d\n"), count_of_files_dequeued)); if (bytes_of_files_hashed > 0) { double sample_hash_match_ratio = double(__int64(bytes_of_files_matching)) / double(__int64(bytes_of_files_hashed)); ASSERT(sample_hash_match_ratio >= 0.0); ASSERT(sample_hash_match_ratio <= 1.0); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tsample hash match ratio = %f\n"), sample_hash_match_ratio)); hash_match_ratio_filter.update_value( sample_hash_match_ratio, count_of_files_hashed); *hash_match_ratio = hash_match_ratio_filter.retrieve_value(); ASSERT(*hash_match_ratio >= 0.0); ASSERT(*hash_match_ratio <= 1.0); TRACE_PRINTF(TC_centctrl, 3, (_T("\tCCgp -\tfiltered hash match ratio = %f\n"), *hash_match_ratio)); } if (bytes_of_files_compared > 0) { double sample_compare_match_ratio = double(__int64(bytes_of_files_merged)) / double(__int64(bytes_of_files_compared)); ASSERT(sample_compare_match_ratio >= 0.0); ASSERT(sample_compare_match_ratio <= 1.0); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tsample compare match ratio = %f\n"), sample_compare_match_ratio)); compare_match_ratio_filter.update_value( sample_compare_match_ratio, count_of_files_compared); *compare_match_ratio = compare_match_ratio_filter.retrieve_value(); ASSERT(*compare_match_ratio >= 0.0); ASSERT(*compare_match_ratio <= 1.0); TRACE_PRINTF(TC_centctrl, 3, (_T("\tCCgp -\tfiltered compare match ratio = %f\n"), *compare_match_ratio)); } if (count_of_files_dequeued > 0) { double sample_dequeue_hash_ratio = double(__int64(count_of_files_hashed)) / double(__int64(count_of_files_dequeued)); ASSERT(sample_dequeue_hash_ratio >= 0.0); ASSERT(sample_dequeue_hash_ratio <= 1.0); TRACE_PRINTF(TC_centctrl, 4, (_T("\tCCgp -\tsample dequeue hash ratio = %f\n"), sample_dequeue_hash_ratio)); dequeue_hash_ratio_filter.update_value( sample_dequeue_hash_ratio, count_of_files_dequeued); *dequeue_hash_ratio = dequeue_hash_ratio_filter.retrieve_value(); ASSERT(*dequeue_hash_ratio >= 0.0); ASSERT(*dequeue_hash_ratio <= 1.0); TRACE_PRINTF(TC_centctrl, 3, (_T("\tCCgp -\tfiltered dequeue hash ratio = %f\n"), *dequeue_hash_ratio)); } } else { TRACE_PRINTF(TC_centctrl, 1, (_T("\tCCgp -\tpartition %d groveler dead.\n"), partition_index)); num_alive_partitions--; if (num_alive_partitions <= 0) { ASSERT(num_alive_partitions == 0); eventlog.report_event(GROVMSG_ALL_GROVELERS_DEAD, 0); event_timer.halt(); return; } } // get_cpu_load();
}
|