// // $Id: WriteEvent_lib.cpp,v 1.11.4.2 2006/08/29 18:31:51 greenlee Exp $ // // File: WriteEvent_lib.cpp // Purpose: Implementation for WriteEvent package class. // Created: 09-APR-1998 Herb Greenlee // // $Revision: 1.11.4.2 $ // // // Include files #include #include #include #include #include "edm/Event.hpp" #include "edm/TKey.hpp" #include "rcp/XRCP.hpp" #include "framework/Result.hpp" #include "framework/Registry.hpp" #include "framework/CmdLineOptions.hpp" #include "io_packages/WriteEvent.hpp" #include "d0stream/d0StreamFactory.hpp" #include "io_packages/ChunkOutputFilter.hpp" #include "d0om/d0_Output_Filter.hpp" #include "d0om/d0om_Options.hpp" #include "io_packages/global.hpp" #include "io_packages/IOstatus.hpp" #include "ErrorLogger/ErrorLog.h" #include "name_translation/AllNameExpander.hpp" #include "io_packages/WriteRemoteExpander.hpp" #include "run_config_fwk/TMBTriggerChunk.hpp" // Physical coupling to DSPACK and EVPACK d0om back ends. #include "d0om_ds/d0StreamDSPACK.hpp" #include "d0om_ds/d0StreamEVPACK.hpp" using edm::Event; using edm::TKey; using edm::THandle; using edm::CollisionID; using edm::XRCPNotFound; using std::string; using std::cerr; using std::endl; using std::ios; using std::ostringstream; using std::list; using std::map; using evpack::user_header_t; using thumbnail::Global; using namespace fwk; // Framework registration FWK_REGISTRY_IMPL(WriteEvent, "$Name: p17-br-04 $") // Static package count. int WriteEvent::_package_count = 0; // Methods. WriteEvent::WriteEvent(Context* context): Package(context), Generator(context), GenerateUserHeader(context), Output(context), JobSummary(context), FileOpen(context), FileClose(context), _max_per_file(0), _sync(false), _input_files_per_file(1), _input_files_remaining(0), _more_files(true), _status_level(1), _write_user_header(true), _stream(0), _num_written(0), _num_written_file(0), _num_files(0), _pwq(0) // // Purpose: Constructor // // Arguements: context - Context variable. Must be passed to all // base class constructors. // { // Increment package count. ++_package_count; // Scan command line options. map > args; const CmdLineOptions opts; int argc = opts.getArgc(); char** argv = opts.getArgv(); for(int i=0; i value; while(i < argc-1 && *argv[i+1] != '-') { list a(string_list(argv[++i])); value.splice(value.end(), a); } if(value.size() == 0) value.push_back(""); args[opt] = value; } // WriteRemoteExpander -- Fetch rcp configuration parameters and register. string copy_command("rcp"); string staging_dir("."); try { copy_command = packageRCP().getString("RemoteCopyCommand"); } catch(XRCPNotFound x) {} try { staging_dir = packageRCP().getString("StagingDirectory"); } catch(XRCPNotFound x) {} if(args.count("-stager") > 0) { string stager = args["-stager"].front(); if(stager.size() > 0) copy_command = stager; } WriteRemoteExpander* wrex = new WriteRemoteExpander(copy_command, staging_dir); bool ok = AllNameExpander::add_to_list(wrex); if(!ok) delete wrex; // Get RCP parameters. _output_file = rcp_string_list(packageRCP(), "OutputFile"); _output_format = packageRCP().getString("OutputFormat"); _compression_level = packageRCP().getInt("CompressionLevel"); string appname = packageRCP().getString("ApplicationName"); _output_filter = packageRCP().getBool("UseOutputFilter"); // Get action id's. Action* pa = Action::instance(); assert(pa != 0); _file_open_id = pa->mapNameToID("fileOpen"); _file_close_id = pa->mapNameToID("fileClose"); // Optional user header parameter (default true). try { _write_user_header = packageRCP().getBool("WriteUserHeader"); } catch(XRCPNotFound x) {} // Optionally set status file information. try { _status_file = packageRCP().getString("StatusFile"); } catch(XRCPNotFound x) {} try { _status_level = packageRCP().getInt("StatusLevel"); } catch(XRCPNotFound x) {} // Output filter parameters. if(_output_filter) { try { _write_chunk_list = rcp_string_list(packageRCP(), "WriteChunkList"); } catch(XRCPNotFound x) {} try { _write_list = rcp_string_list(packageRCP(), "WriteClassList"); } catch(XRCPNotFound x) {} try { _veto_list = rcp_string_list(packageRCP(), "VetoClassList"); } catch(XRCPNotFound x) {} } // Always keep certain basic infrastructure classes. if(_write_list.size() > 0) { _write_list.push_back("edm::Event"); _write_list.push_back("edm::EventValue"); } // Set d0om unknown object options. try { bool copy_unknown = packageRCP().getBool("CopyUnknown"); if(copy_unknown) d0om_Options::make_unknowns_on_write(copy_unknown); } catch(XRCPNotFound x) {} // Maximum number of events per file. try { _max_per_file = packageRCP().getInt("MaxEventsPerFile"); } catch(XRCPNotFound x) {} // Synchronize file advance with ReadEvent. try { _sync = packageRCP().getBool("Synchronize"); } catch(XRCPNotFound x) {} // Number of input files per output file. try { _input_files_per_file = packageRCP().getInt("InputFilesPerFile"); } catch(XRCPNotFound x) {} // Embedded definitions option. bool embed_defs = false; try { embed_defs = packageRCP().getBool("EmbedDefs"); } catch(XRCPNotFound x) {} // Tag parameters. try { _write_event_tags.push_back("*"); _write_event_tags = rcp_string_list(packageRCP(), "WriteEventTags"); } catch(XRCPNotFound x) {} try { _veto_event_tags = rcp_string_list(packageRCP(), "VetoEventTags"); } catch(XRCPNotFound x) {} try { _write_chunk_tags.push_back("*"); _write_chunk_tags = rcp_string_list(packageRCP(), "WriteChunkTags"); } catch(XRCPNotFound x) {} _write_all_events = _write_event_tags.size() == 0 || _write_event_tags.size() == 1 && _write_event_tags.front() == "*"; _write_all_chunks = _write_chunk_tags.size() == 0 || _write_chunk_tags.size() == 1 && _write_chunk_tags.front() == "*"; if(_package_count == 1) { // Check for overridden rcp parameters. if(args.count("-output_file") > 0) _output_file = args["-output_file"]; if(args.count("-max_per_file") > 0) _max_per_file = atoi(args["-max_per_file"].front().c_str()); } // Report on package initialization. out() << "Initializing WriteEvent package" << endl; out() << "Output file: " << _output_file << endl; out() << "Maximum Events Per File: " << _max_per_file << endl; out() << "Output format: " << _output_format << endl; if(_output_filter) { out() << "Output filter installed" << endl; out() << "Write chunk list: " << string_list(_write_chunk_list) << endl; out() << "Write class list: " << string_list(_write_list) << endl; out() << "Veto class list: " << string_list(_veto_list) << endl; } else out() << "No output filter" << endl; out() << "Write event tags: " << string_list(_write_event_tags) << endl; out() << "Veto event tags: " << string_list(_veto_event_tags) << endl; out() << "Write chunk tags: " << string_list(_write_chunk_tags) << endl; if (embed_defs) out() << "Embedding a dspack dictionary within each evpack record." <copyTagged(_write_chunk_tags); pevent = &event1; } // See if we have reached the maximum number of events for this file. if(_num_written_file >= _max_per_file && _max_per_file > 0) { file_close(); file_open(); } // Construct a key using the collision id. string skey = pevent->collisionID().toString(); d0Key key(skey.c_str()); // Associate key with unwritten object. bool bindok = _stream->bind(*pevent, key); if(!bindok && _output_format != "DSPACK" && _output_format != "EVPACK") { update_status(true, true); ERRLOGTO(log(), ELerror, "Bind Failed") << '\n' << "Bind failed for key " << key << '\n' << endmsg; } // Get user header. user_header_t uh; if(_write_user_header) { // Find TMBTriggerChunk. TKey tkey; list > chunk_list = tkey.findAll(event); // Don't know what to do if there is more than one TMBTriggerChunk. if(chunk_list.size() > 1) { cerr << "WriteEvent: More than one TMBTriggerChunk" << endl; assert(false); abort(); } else if(chunk_list.size() > 0) { const THandle& handle = chunk_list.front(); assert(handle.isValid()); const Global& global = handle->global(); uh = global.get_event_tag_words(); } } // If it's the first event, try to install a chunk filter for just // the chunks we're writing, to reduce dictionary bloating. // This is pretty gross --- the marking code in d0om_ds works // off of type filtering in an output filter, but for thumbnails, // the chunks to output are specified with the tagging mechanism. // Thus, we muck about here to bridge that gap. bool saved_filter = false; if (_num_written_file == 0) { saved_filter = filter_hack (*pevent); } // Write event. _stream->write(*pevent, skey, uh); if(_num_written_file == 0) _first_written_file = pevent->collisionID(); _last_written_file = pevent->collisionID(); ++_num_written_file; ++_num_written; if (saved_filter) { d0_Output_Filter_Base* bof = 0; assert (_stream->release_output_filter (bof)); ChunkOutputFilter* of = dynamic_cast (bof); assert (of != 0); _stream->set_output_filter (of->release_daisy_chain()); delete of; } // Update status. update_status(true, false); // Forget address of work queue. // _pwq = 0; } return(Result::success); } Result WriteEvent::outputEvent(WorkQueue& wq) // // Purpose: Write event. This no event version does nothing except // remember a pointer to the work queue. // // Arguements: wq - Framework work queue. // // Returns: success/fail. // { return(Result::success); } Result WriteEvent::generateEvent(WorkQueue& wq) // // Purpose: Remember pointer to work queue. // // Arguements: event - An edm Event. // wq - Framework work queue. // // Returns: success/fail. // { // Remember address of work queue. _pwq = &wq; return(Result::success); } Result WriteEvent::generateUserHeader(WorkQueue& wq, Action::Id next_in_flow) // // Purpose: Remember pointer to work queue. // // Arguments: wq - Work queue. // // Returns: Success. // { // Remember address of work queue. _pwq = &wq; return(Result::success); } Result WriteEvent::addDataToEvent(Event& event) // // Purpose: Do nothing and return failure. // // Arguements: event - Event // // Returns: Success/fail. // { return Result::failure; } string WriteEvent::location() // // Purpose: Return the current run/event. // // Returns: String containing run and event number. // { return string(); } Result WriteEvent::jobSummary() { file_close(); out() << "Total events written: " << _num_written << endl; return(Result::success); } void WriteEvent::file_open() // // Purpose: Open the current _output_file. // { static d0StreamFactory* factory = 0; if(!factory) factory = d0StreamFactory::locateStreamFactory(); // Reinitialize the output stream if this is the first file we are opening. // This is necessary to properly handle certain cases of nested deferred // expanders. if(_num_files == 0) _output_file.start(); ++_num_files; assert(_stream == 0); if(!_more_files) { ERRLOGTO(log(), ELfatal, "Open failure") << '\n' << "No more files to open" << '\n' << endmsg; abort(); } // Print message. out() << "WriteEvent: Opening " << _output_file.str() << endl; // Zero statistics. _num_written_file = 0; _first_written_file = CollisionID(); _last_written_file = CollisionID(); _input_files_remaining = _input_files_per_file; // Open file. _stream = factory->make_d0Stream(_output_file, _output_format.c_str(), _mode, _optstr); if(!_stream) { update_status(false, true); ERRLOGTO(log(), ELfatal, "Open failure") << '\n' << "Failed to open stream " << string(_output_file) << " using format " << _output_format << '\n' << endmsg; } // Install the output filter(s), if any. if(_output_filter) { ChunkOutputFilter* pof1 = 0; d0_Output_Filter* pof2 = 0; // Add write chunks to filter. if(_write_chunk_list.size() > 0) { pof1 = new ChunkOutputFilter; for(std::list::const_iterator i=_write_chunk_list.begin(); i!=_write_chunk_list.end(); ++i) { pof1->add_write_chunk((*i).c_str()); } } // Add write classes to filter. if(_write_list.size() > 0) { pof2 = new d0_Output_Filter; for(std::list::const_iterator i=_write_list.begin(); i!=_write_list.end(); ++i) { pof2->add_write_class((*i).c_str()); } } // Add veto classes to filter. if(_veto_list.size() > 0) { if(!pof2) pof2 = new d0_Output_Filter; for(std::list::const_iterator i=_veto_list.begin(); i!=_veto_list.end(); ++i) { pof2->add_veto_class((*i).c_str()); } } // Add filter(s) to stream. if(pof1 != 0 && pof2 != 0) { pof1->daisy_chain(pof2); pof2 = 0; } if(pof1 != 0) _stream->set_output_filter(pof1); if(pof2 != 0) _stream->set_output_filter(pof2); } // Push fileOpen event onto framework work queue. assert(_pwq != 0); Ref file_info(new FileInfo(_output_file.str(), FileInfo::OPEN, FileInfo::WRITE)); pushQueueFront(*_pwq, _file_open_id, file_info); // Update status update_status(true, false); } void WriteEvent::file_close() // // Purpose: Close the currently open file and print file statistics. // { if(_stream) { out() << "WriteEvent: Closing " << _output_file.str() << "\nFirst event written: " << _first_written_file << "\nLast event written: " << _last_written_file << "\nEvents written: " << _num_written_file << endl; _stream->close(); delete _stream; _stream = 0; // Push fileClose event onto framework work queue. // _pwq will be zero if this method is called during JobSummary hook. if(_pwq != 0) { Ref file_info(new FileInfo(_output_file.str(), FileInfo::CLOSE, FileInfo::WRITE, _num_written_file, _first_written_file, _last_written_file)); pushQueueFront(*_pwq, _file_close_id, file_info); } // Update status. update_status(false, false); // Advance to the next output file. _more_files = !_output_file.next(); } } Result WriteEvent::fileOpen(const FileInfo& fileinfo) // // Purpose: Called when a file is opened for reading. // // Arguements: fileinfo - File data. // // Returns: Success = true. // { if(fileinfo.access() == FileInfo::READ) { assert(fileinfo.event_type() == FileInfo::OPEN); if(_sync && _input_files_remaining == 0) file_open(); } return Result::success; } Result WriteEvent::fileClose(const FileInfo& fileinfo) // // Purpose: Called when a file is closed for reading. // // Arguements: fileinfo - File data. // // Returns: Success = true. // { if(fileinfo.access() == FileInfo::READ) { assert(fileinfo.event_type() == FileInfo::CLOSE); if(_sync && --_input_files_remaining == 0) file_close(); } return Result::success; } void WriteEvent::update_status(bool open, bool error) // // Purpose: Update status file. // // Arguments: open - Is data file open? // error - Did the data file get an i/o error? // { if(_status_file.size() > 0 && _status_level > 0) { IOstatus status(_status_file, log()); status.set_output_file(_output_file.str()); status.set_first_event(_first_written_file); status.set_last_event(_last_written_file); if(_status_level > 1) status.add_event(_last_written_file); status.set_total_events(_num_written_file); status.set_open_flag(open); status.set_error_flag(error); status.update(); } } bool WriteEvent::filter_hack (const edm::Event& event) // // Purpose: Look at the chunks in EVENT. // Create a new ChunkOutputFilter passing only those chunks. // Install the new output filter, with any existing // one daisy-chained. // // Inputs: // event - The event to process. // // Returns: // True if we installed a new filter. // False if we failed to do so. // { // Get the old filter, if any. d0_Output_Filter_Base* old_of; if (!_stream->release_output_filter (old_of)) { // The stream doesn't implement release_output_filter. // Give up. return false; } // Make the new filter. ChunkOutputFilter* new_of = new ChunkOutputFilter; // Get the list of chunks in the event. // Be careful not to dereference them. std::vector > chunks; event.findAll (chunks); // Add these chunks to the filter. for (int i=0; i < chunks.size(); i++) { if (chunks[i]) new_of->add_write_chunk (chunks[i].d0om_type()->name()); } // Install the new filter, with the old one daisy-chained. new_of->daisy_chain (old_of); _stream->set_output_filter (new_of); // Done! return true; }