ABACUS/src/SCAN/Scan_Thread_Data.cc

290 lines
8.4 KiB
C++

/**********************************************************
This software is part of J.-S. Caux's ABACUS library.
Copyright (c) J.-S. Caux.
-----------------------------------------------------------
File: Scan_Thread_Data.cc
Purpose: defines all functions for Scan_Thread_Data class.
***********************************************************/
#include "ABACUS.h"
using namespace std;
using namespace ABACUS;
namespace ABACUS {
Scan_Thread::Scan_Thread()
{
label = "";
type = -1;
}
Scan_Thread& Scan_Thread::operator= (const Scan_Thread& RefThread)
{
label = RefThread.label;
type = RefThread.type;
return *this;
}
Scan_Thread_Data::Scan_Thread_Data()
{
}
Scan_Thread_Data::Scan_Thread_Data(string thrdir_name_ref, bool refine)
{
thrdir_name = thrdir_name_ref;
nthreads_total = Vect<int> (0, nlists);
nthreads_on_disk = Vect<int> (0, nlists);
nthreads_in_memory = Vect<int> (0, nlists);
lowest_il_with_nthreads_neq_0 = nlists - 1;
dim = Vect<int> (nlists); // size of the memory vector
label = Vect<Vect<string> > (nlists);
type = Vect<Vect<int> > (nlists);
// Give starting values to all:
for (int il = 0; il < nlists; ++il) {
dim[il] = 100;
label[il] = Vect<string> (dim[il]);
type[il] = Vect<int> (dim[il]);
}
filename = Vect<string> (nlists);
for (int il = 0; il < nlists; ++il) {
stringstream filename_strstream;
filename_strstream << thrdir_name << "/" << il << ".thr";
filename[il] = filename_strstream.str();
if (!refine) remove(filename[il].c_str());
// the file is deleted to make sure we don't interfere with a previous (failed) computation
}
if (!refine) {
// remove the nthreads.dat file
stringstream datfile_strstream;
datfile_strstream << thrdir_name << "/nthreads.dat";
string datfilename = datfile_strstream.str();
remove(datfilename.c_str());
}
}
Scan_Thread_Data::~Scan_Thread_Data()
{
}
bool Scan_Thread_Data::Increase_Memory_Size (int il, int nr_to_add)
{
if (il < 0 || il > nlists) ABACUSerror("ilist out of bounds in Scan_Thread_Data::Increase_Memory_Size");
dim[il] += nr_to_add;
try {
label[il].Increase_Size (nr_to_add, "");
type[il].Increase_Size (nr_to_add);
}
catch (bad_alloc) {
cout << "dim[il] " << dim[il] << "\tnr_to_add " << nr_to_add << endl;
ABACUSerror("Memory allocation failed in Scan_Thread_Data::Increase_Memory_Size.");
}
return(true);
}
void Scan_Thread_Data::Include_Thread (DP abs_data_value_ref, string label_ref, int type_ref)
{
if (abs_data_value_ref <= 0.0) abs_data_value_ref = 1.0e-200; // safety
// Determine which ilist index is to be used:
int il = int(-log(abs_data_value_ref)/logscale);
if (il < 0) il = 0;
if (il >= nlists) il = nlists - 1;
(*this).Include_Thread (il, label_ref, type_ref);
}
void Scan_Thread_Data::Include_Thread (int il, string label_ref, int type_ref)
{
if (il < 0 || il > nlists - 1)
ABACUSerror("il out of range in Scan_Thread_Data::Include_Thread.");
if (il < lowest_il_with_nthreads_neq_0) lowest_il_with_nthreads_neq_0 = il;
// Keep in memory for now:
if (nthreads_in_memory[il] > dim[il] - 10) {
(*this).Increase_Memory_Size (il, dim[il]);
}
label[il][nthreads_in_memory[il] ] = label_ref;
type[il][nthreads_in_memory[il] ] = type_ref;
nthreads_in_memory[il]++;
nthreads_total[il]++;
// We save the threads to disk if there are sufficiently many:
if (nthreads_in_memory[il] > 1000) {
fstream outfile;
outfile.open(filename[il].c_str(), fstream::out | fstream::app);
for (int it = 0; it < nthreads_in_memory[il]; ++it)
outfile << label[il][it] << "\t" << type[il][it] << endl;
outfile.close();
nthreads_on_disk[il] += nthreads_in_memory[il];
// We then reset these memory buffers
dim[il] = 100;
nthreads_in_memory[il] = 0;
label[il] = Vect<string> (dim[il]);
type[il] = Vect<int> (dim[il]);
}
}
Vect<Scan_Thread> Scan_Thread_Data::Extract_Next_Scan_Threads ()
{
// Returns a vector of threads which are next in line for scanning.
int il_used = lowest_il_with_nthreads_neq_0;
Vect<Scan_Thread> next_in_line(nthreads_total[il_used]);
// Copy the in-memory threads
for (int it = 0; it < nthreads_in_memory[il_used]; ++it) {
next_in_line[it] = Scan_Thread(label[il_used][it], type[il_used][it]);
}
// Copy the on-disk threads
if (nthreads_on_disk[il_used] > 0) {
ifstream infile;
infile.open(filename[il_used].c_str());
string label_read;
int type_read;
for (int in = 0; in < nthreads_on_disk[il_used]; ++in) {
infile >> label_read;
infile >> type_read;
next_in_line[nthreads_in_memory[il_used] + in] = Scan_Thread(label_read, type_read);
}
}
// The threads in this list are now considered handled.
// Clear memory and remove on-disk file:
nthreads_total[il_used] = 0;
nthreads_on_disk[il_used] = 0;
nthreads_in_memory[il_used] = 0;
label[il_used] = Vect<string> ("", dim[il_used]);
type[il_used] = Vect<int> (dim[il_used]);
remove(filename[il_used].c_str());
// Find the next non-empty list:
do {
lowest_il_with_nthreads_neq_0 += 1;
if (lowest_il_with_nthreads_neq_0 == nlists) {
lowest_il_with_nthreads_neq_0 = nlists - 1;
break;
}
} while (nthreads_total[lowest_il_with_nthreads_neq_0] == 0);
return(next_in_line);
}
Vect<Scan_Thread> Scan_Thread_Data::Extract_Next_Scan_Threads (int min_nr)
{
// Ensures that at least min_nr of threads are returned, if possible
Vect<Scan_Thread> threads_to_return = Extract_Next_Scan_Threads();
while (threads_to_return.size() < min_nr && (*this).lowest_il_with_nthreads_neq_0 < (*this).nlists - 1) {
threads_to_return.Append ((*this).Extract_Next_Scan_Threads());
}
return(threads_to_return);
}
void Scan_Thread_Data::Flush_to_Disk (int il)
{
if (il < 0 || il > nlists - 1)
ABACUSerror("il out of range in Scan_Thread_Data::Flush_to_Disk.");
if (nthreads_in_memory[il] > 0) {
fstream outfile;
outfile.open(filename[il].c_str(), fstream::out | fstream::app);
for (int it = 0; it < nthreads_in_memory[il]; ++it)
outfile << label[il][it] << "\t" << type[il][it] << endl;
outfile.close();
nthreads_on_disk[il] += nthreads_in_memory[il];
// We then reset these memory buffers
dim[il] = 100;
nthreads_in_memory[il] = 0;
label[il] = Vect<string> (dim[il]);
type[il] = Vect<int> (dim[il]);
}
}
void Scan_Thread_Data::Save()
{
// We save the in-memory threads to disk:
for (int il = 0; il < nlists; ++il) (*this).Flush_to_Disk(il);
ofstream nthreads_outfile;
stringstream nthreads_outfile_strstream;
nthreads_outfile_strstream << thrdir_name << "/nthreads.dat";
string nthreads_outfile_str = nthreads_outfile_strstream.str();
nthreads_outfile.open(nthreads_outfile_str.c_str());
if (nthreads_outfile.fail())
ABACUSerror("Could not open outfile in Scan_Thread_Data::Save... ");
//cout << "Saving threads: nthreads_tot vector is" << endl;
for (int il = 0; il < nlists; ++il) {
if (nthreads_total[il] != nthreads_in_memory[il] + nthreads_on_disk[il])
ABACUSerror("nthreads_total neq _in_memory + _on_disk in Scan_Threads_Data::Save");
if (nthreads_total[il] > 0) nthreads_outfile << endl << il << "\t" << nthreads_total[il];
}
nthreads_outfile.close();
return;
}
void Scan_Thread_Data::Load ()
{
ifstream nthreads_infile;
stringstream nthreads_infile_strstream;
nthreads_infile_strstream << thrdir_name << "/nthreads.dat";
string nthreads_infile_str = nthreads_infile_strstream.str();
nthreads_infile.open(nthreads_infile_str.c_str());
if (nthreads_infile.fail()) ABACUSerror("Could not open infile in Scan_Thread_Data::Load... ");
// Read the number of elements in each list:
Vect<int> nthreads_read(0, nlists);
int il_read;
bool min_il_set = false;
while (nthreads_infile.peek() != EOF) {
nthreads_infile >> il_read;
if (!min_il_set) {
lowest_il_with_nthreads_neq_0 = il_read;
min_il_set = true;
}
nthreads_infile >> nthreads_on_disk[il_read];
nthreads_total[il_read] = nthreads_on_disk[il_read];
}
nthreads_infile.close();
return;
}
} // namespace ABACUS