Main Page | Modules | Namespace List | Class Hierarchy | Alphabetical List | Class List | File List | Namespace Members | Class Members | File Members | Related Pages

RotoServer.cxx

Go to the documentation of this file.
00001 
00002 // $Id: RotoServer.cxx,v 1.36 2005/09/07 19:41:58 rhatcher Exp $
00003 //
00004 // Communications and Control of Roto-rooter for receiving raw blocks,
00005 // turning them into objects and writing them to files
00006 //
00007 // rhatcher@fnal.gov
00009 
00010 #include "Rotorooter/RotoServer.h"
00011 #include "Rotorooter/RotoSocket.h"
00012 #include "Rotorooter/RotoObjectifier.h"
00013 
00014 #include "OnlineUtil/msgLogLib/msgLog.h"
00015 
00016 #include "TSystem.h"
00017 #include "TServerSocket.h"
00018 
00019 // for function: char* Form(const char *fmt, ...)
00020 #include "TString.h"
00021 #include "TSystem.h"
00022 
00023 #include "RawData/RawRecord.h"
00024 #include "RawData/RawDaqHeader.h"
00025 #include "RawData/RawBeamMonHeader.h"
00026 
00027 #include "RawData/RawLIAdcSummaryBlock.h"
00028 #include "RawData/RawLITimingSummaryBlock.h"
00029 #include "RawData/RawSnarlHeaderBlock.h"
00030 #include "RawData/RawDigitDataBlock.h"
00031 
00032 #include "RawData/RawDcsAlarmBlock.h"
00033 #include "RawData/RawDcsMonitorBlock.h"
00034 #include "RawData/RawBeamMonHeaderBlock.h"
00035 
00036 #include "Persistency/PerOutputStream.h"
00037 #include "Persistency/PerOutputStreamManager.h"
00038 
00039 const UInt_t dbg_ReportCmd    = 0x0001;
00040 const UInt_t dbg_ReportReply  = 0x0002;
00041 const UInt_t dbg_ReportHex    = 0x0004;
00042 const UInt_t dbg_ReportNotTo  = 0x0008;
00043 
00044 UInt_t RotoServer::fgDebugFlags = 0;
00045 //  dbg_ReportCmd | dbg_ReportReply | dbg_ReportNotTo;
00046 
00047 static string reportOpt = 
00048   (RotoServer::GetDebugFlags() & dbg_ReportHex) ? "b" : " ";
00049 
00050 #include "MinosObjectMap/MomNavigator.h"
00051 #include "MessageService/MsgService.h"
00052 
00053 ClassImp(RotoServer)
00054 CVSID("$Id: RotoServer.cxx,v 1.36 2005/09/07 19:41:58 rhatcher Exp $");
00055 
00056 enum RotoFailureMode {
00057    kSuccess         = 0x00000000,
00058    kFailRecvCmd     = 0x00000001,
00059    kFailRecvByteCnt = 0x00000002,
00060    kFailRecvData    = 0x00000004,
00061    kFailBuffResize  = 0x00000008,
00062    kFailNotToMe     = 0x00000010,
00063    kFailCmdBadFrom  = 0x00000020,
00064    kFailCmdUnknown  = 0x00000040,
00065    kFailReqUnknown  = 0x00000080,
00066    kFailBadRcType   = 0x00000100,
00067    kFailOpenFile    = 0x00001000,
00068    kFailCloseFile   = 0x00010000,
00069    kFailRecBuffer   = 0x00100000,
00070    kFailInflateBuff = 0x00200000,
00071    kFailPartialRec  = 0x00400000,
00072    kFailWriteRec    = 0x01000000,
00073 
00074    kFailNoCodeYet   = 0x40000000,
00075    kFailWhoops      = 0x80000000
00076 };
00077 
00078 //_____________________________________________________________________________
00079 RotoServer::RotoServer(Int_t port, Int_t nwords, Bool_t allow_overwrite, 
00080                        Int_t tcp_nodelay_flag)
00081   : fPort(port), fTCP_NODELAY_flag(tcp_nodelay_flag),
00082     fFlatBinaryName(""), fFlatBinaryOutputFile(0), fSymlink("")
00083 {
00084 //
00085 //  Purpose:  Create a RotoServer
00086 //            Open the server socket looking for connections on the port
00087 //            Allocate buffer to receive data
00088 //
00089 //  Argument: port     TCP/IP port #
00090 //            nwords   number of words for initial data recv buffer
00091 //            allow_overwrite   all server to overwrite output files
00092 //            tcp_nodelay_flag  set socket to use TCP_NODELAY
00093 //
00094 //  Return:   (none)
00095 //
00096 //  Contact:  R. Hatcher
00097 //
00098 
00099    Init(nwords, allow_overwrite);
00100 
00101    fServerSocket = new TServerSocket(port, kTRUE);
00102    fServerSocket->SetOption(kNoDelay,fTCP_NODELAY_flag);
00103 }
00104 
00105 //_____________________________________________________________________________
00106 RotoServer::RotoServer(const char* fileName, Int_t nwords, 
00107                        Bool_t allow_overwrite)
00108    : fFlatBinaryName(""), fFlatBinaryOutputFile(0), fSymlink("")
00109 {
00110 //
00111 //  Purpose:  Create a RotoServer
00112 //            Open the RotoSocket (file emulation of a TSocket)
00113 //            Allocate buffer to receive data
00114 //
00115 //  Argument: fileName File name for RotoSocket.
00116 //            nwords   number of words for initial data recv buffer
00117 //            allow_overwrite kTRUE if allowed to overwite output file.
00118 //
00119 //  Return:   (none)
00120 //
00121 //  Contact:  R. Hatcher
00122 //
00123 
00124    Init(nwords, allow_overwrite);
00125 
00126    fSocket = new RotoSocket(fileName);
00127 
00128 }
00129 //_____________________________________________________________________________
00130 RotoServer::~RotoServer() 
00131 {
00132 //
00133 //  Purpose:  Destroy a RotoServer
00134 //            Close the server socket looking for connections on the port
00135 //            Close any open sockets
00136 //            Deallocate buffer to receive data
00137 //
00138 //  Argument: (none)
00139 //
00140 //  Return:   (none)
00141 //
00142 //  Contact:  R. Hatcher
00143 //
00144 
00145    fCurrentState = MINOS_ROOTER_STATE_STOPPING;
00146 
00147    if (fServerSocket) {
00148       fServerSocket->Close();
00149       delete fServerSocket;
00150       fServerSocket = 0;
00151    }
00152 
00153    CloseAndDeleteSocket(fSocket);
00154 
00155    if (fOutputStreamManager) {
00156       // make sure *everything* has been flushed
00157       fOutputStreamManager->Write();
00158       fOutputStreamManager->CloseStream();
00159       //CloseStream() makes this redundant: fOutputStreamManager->CloseFile();
00160      
00161       delete fOutputStreamManager;
00162       fOutputStreamManager = 0;
00163    }   
00164 
00165    SetFlatBinaryOutputFile("");
00166 
00167    fCurrentState = MINOS_ROOTER_STATE_UNKNOWN;
00168 
00169    logNotice("~RotoServer");
00170 }
00171 
00172 //_____________________________________________________________________________
00173 void RotoServer::CloseAndDeleteSocket(TSocket*& socket)
00174 {
00175 //
00176 //  Purpose:  Close and delete socket; zero pointer
00177 //
00178 //  Argument: socket   reference to pointer to socket      
00179 //
00180 //  Return:   (none)
00181 //
00182 //  Contact:  R. Hatcher
00183 //
00184    if (socket) {
00185       if (socket->IsValid()) socket->Close();
00186       delete socket;
00187       socket = 0;
00188    }
00189 }
00190 
00191 //_____________________________________________________________________________
00192 void RotoServer::Init(Int_t nwords, Bool_t allow_overwrite)
00193 {
00194 //
00195 //  Purpose:  Common initialisation of a RotoServer
00196 //            Allocate buffer to receive data
00197 //
00198 //  Argument: nwords          - number of words for initial data recv buffer
00199 //            allow_overwrite - allow any existing file to be overwritten
00200 //
00201 //  Return:   (none)
00202 //
00203 //  Contact:  R. Hatcher
00204 //
00205 
00206    fServerSocket         = 0;
00207    fSocket               = 0;
00208    fCurrentState         = MINOS_ROOTER_STATE_UNKNOWN;
00209    fNackMessage          = ""; 
00210    fStateReport          = "";
00211    fAccessMode           = allow_overwrite ? Per::kRecreate : Per::kNew;
00212    fOutputStreamManager  = 0;
00213    fFakeDAQFileName      = "";
00214 
00215    ResizeBuffer(fRecvBuffer,nwords*sizeof(Int_t)/sizeof(Char_t),"Recv");
00216    ResizeBuffer(fReplyBuffer,4096*sizeof(Char_t),"Reply");
00217 
00218    fOutputStreamManager = new PerOutputStreamManager();
00219 
00220    fCurrentState = MINOS_ROOTER_STATE_UNCONNECTED;
00221    RotoObjectifier::SysLogRawBlockRegistry();
00222 
00223    logInfo("Default RotoServer configuration");
00224 
00225    // Set default AutoSave parameters
00226    // all streams: 1000 records or 10 seconds
00227    SetAutoSaveConfig("*",1000,10);
00228    // Set default Compression to use file defaults
00229    SetCompressConfig("*",-1);
00230    // Set default BasketSize to a uniform 64000
00231    SetBasketSizeConfig("*",64000);
00232 
00233 }
00234 
00235 //_____________________________________________________________________________
00236 Int_t RotoServer::SetAutoSaveConfig(const string& stream, UInt_t nrec, UInt_t nsec) 
00237 {
00238 //
00239 //  Purpose:  Set AutoSave configuration values in <map>s
00240 //
00241 //  Argument: stream    - name of stream ("*" = all)
00242 //            nrec      - record frequency
00243 //            nsec      - max time duration
00244 //
00245 //  Return:   number of stream configs modified
00246 //
00247 //  Contact:  R. Hatcher
00248 //
00249 
00250    int nmod = 0;
00251 
00252    int ibeg = Per::kDaqSnarl;
00253    int iend = Per::kBeamMon;
00254    for (int i = ibeg; i<=iend; i++ ) {
00255       Per::EStreamType stype = (Per::EStreamType) i;
00256       string treeName = Per::AsString(stype);
00257       if (treeName == stream || stream == "*") {
00258          logInfo("AutoSave is %d records or %d seconds for %s stream",
00259                  nrec,nsec,treeName.c_str());
00260          fAutoSaveIntMap[stype]  = nrec;
00261          fAutoSaveTimeMap[stype] = nsec;
00262          nmod++;
00263       }
00264    }
00265 
00266    return nmod;
00267 }
00268 
00269 //_____________________________________________________________________________
00270 Int_t RotoServer::SetCompressConfig(const string& stream, Int_t level)
00271 {
00272 //
00273 //  Purpose:  Set Compression configuration values in <map>s
00274 //
00275 //  Argument: stream    - name of stream ("*" = all)
00276 //            level     - compression level
00277 //                          -1 : use file default
00278 //                           0 : no compression
00279 //                           1 : default ROOT compression
00280 //                        ...9 : maximum compression
00281 //
00282 //  Return:   number of stream configs modified
00283 //
00284 //  Contact:  R. Hatcher
00285 //
00286 
00287    int nmod = 0;
00288 
00289    int ibeg = Per::kDaqSnarl;
00290    int iend = Per::kBeamMon;
00291    for (int i = ibeg; i<=iend; i++ ) {
00292       Per::EStreamType stype = (Per::EStreamType) i;
00293       string treeName = Per::AsString(stype);
00294       if (treeName == stream || stream == "*") {
00295          logInfo("Compression level is %d for %s stream",
00296                  level,treeName.c_str());
00297          fCompressionMap[stype]  = level;
00298          nmod++;
00299       }
00300    }
00301 
00302    return nmod;
00303 }
00304 
00305 //_____________________________________________________________________________
00306 Int_t RotoServer::SetBasketSizeConfig(const string& stream, UInt_t basketsize)
00307 {
00308 //
00309 //  Purpose:  Set BasketSize configuration values in <map>s
00310 //
00311 //  Argument: stream     - name of stream ("*" = all)
00312 //            basketsize - size of basket
00313 //
00314 //  Return:   number of stream configs modified
00315 //
00316 //  Contact:  R. Hatcher
00317 //
00318 
00319    int nmod = 0;
00320 
00321    int ibeg = Per::kDaqSnarl;
00322    int iend = Per::kBeamMon;
00323    for (int i = ibeg; i<=iend; i++ ) {
00324       Per::EStreamType stype = (Per::EStreamType) i;
00325       string treeName = Per::AsString(stype);
00326       if (treeName == stream || stream == "*") {
00327          logInfo("BasketSize is %d for %s stream",
00328                  basketsize,treeName.c_str());
00329          fBasketSizeMap[stype]  = basketsize;
00330          nmod++;
00331       }
00332    }
00333 
00334    return nmod;
00335 }
00336 
00337 //_____________________________________________________________________________
00338 Int_t RotoServer::SetFlatBinaryOutputFile(const string& filename)
00339 {
00340 //
00341 //  Purpose:  Set name of flat binary output file
00342 //            Open file if necessary
00343 //
00344 //  Argument: filename   - name of file
00345 //
00346 //  Return:   success
00347 //
00348 //  Contact:  R. Hatcher
00349 //
00350 
00351    // if the filename is unchanged do nothing
00352    if (filename == fFlatBinaryName) return 1;
00353 
00354    if (fFlatBinaryOutputFile) {
00355      // close currently open file
00356      logNotice("close flat binary output file: %s",fFlatBinaryName.c_str());
00357      fclose(fFlatBinaryOutputFile);
00358      fFlatBinaryOutputFile = 0;
00359      fFlatBinaryName = "";
00360    }
00361 
00362    if (filename != "") {
00363      // open the requested file
00364      logNotice("open flat binary output file: %s",filename.c_str());
00365      fFlatBinaryOutputFile = fopen(filename.c_str(),"wb");
00366      fFlatBinaryName = filename;
00367    }
00368 
00369    return 1;
00370 }
00371 
00372 //_____________________________________________________________________________
00373 Int_t RotoServer::SetSymlink(const char* symlink_name)
00374 {
00375     fSymlink = "";
00376     if (symlink_name) fSymlink = symlink_name;
00377     return 1;
00378 }
00379 
00380 
00381 //_____________________________________________________________________________
00382 void RotoServer::Run() 
00383 {
00384 //
00385 //  Purpose:  Main loop
00386 //            Test validity of server socket, wait for connection attempt
00387 //            accept commands on open socket, process and reply
00388 //
00389 //  Argument: (none)
00390 //
00391 //  Return:   (none)
00392 //
00393 //  Contact:  R. Hatcher
00394 //
00395 
00396    //
00397    // is this server socket okay? If missing the socket must be 
00398    // a valid RotoSocket.
00399    if (!fServerSocket ) { 
00400      if ( ! fSocket->IsValid() ) {
00401         logCritical("Unable to open file as RotoSocket");
00402         return;
00403      }
00404    }
00405    else if (!fServerSocket->IsValid()) {
00406       string why;
00407       switch (fServerSocket->GetErrorCode()) {
00408       case  0: why = "no error, socket should be valid"; break;
00409       case -1: why = "low level socket() call failed";   break;
00410       case -2: why = "low level bind() call failed - ";
00411                why += "port may already be in use";
00412                break;
00413       case -3: why = "low level listen() call failed";   break;
00414       default: why = "unknown error";                    break;
00415       }
00416       // MSG("Roto",Msg::kFatal) 
00417       //   << endl
00418       //   << "  server socket failed on port "
00419       //   << fServerSocket->GetLocalPort()
00420       //   << " was invalid \n  "
00421       //   << why
00422       //   << endl;
00423       logCritical("server socket on port %d failed, why=%s",
00424                   fServerSocket->GetLocalPort(),why.c_str());
00425       return;
00426    }
00427 
00428    fCurrentState = MINOS_ROOTER_STATE_UNCONNECTED;
00429    Int_t ncmd = 0;
00430 
00431    RotoRcCmd cmd;
00432 
00433    //
00434    // main loop, while shutdown hasn't been requested
00435    //
00436    while (fCurrentState != MINOS_ROOTER_STATE_SHUTDOWN_REQ) {
00437 
00438       // Accept a connection and return a full-duplex communication socket.
00439       if (!fSocket) {
00440          fSocket = fServerSocket->Accept();
00441          fSocket->SetOption(kNoDelay,fTCP_NODELAY_flag);
00442          //fSocket->SetOption(kKeepAlive,1);
00443          if (fCurrentState == MINOS_ROOTER_STATE_UNCONNECTED)
00444              fCurrentState = MINOS_ROOTER_STATE_CONNECTED;
00445       }
00446 
00447       Int_t status = kSuccess;
00448       fNackMessage = "";
00449 
00450       // accept commands on open socket
00451       
00452       status |= RecvCommand(fSocket,cmd);
00453       // failure to get command+data correctly is indicative
00454       // of a broken socket
00455       if (status & (kFailRecvCmd|kFailRecvByteCnt|kFailRecvData) ) {
00456          logError("socket from '%s' abruptly broken",
00457                   fSocket->GetInetAddress().GetHostName());
00458 
00459          // if (fgDebugFlags & dbg_ReportCmd) 
00460          //    MSG("Roto", Msg::kInfo) 
00461          //      << "RotoServer::Run socket appears broken"  << endl;
00462 
00463          // should here cleanly close all files associated with
00464          // this socket, for now (since we're single socketed)
00465          // just close *everything*
00466          fOutputStreamManager->Write();
00467          fOutputStreamManager->CloseStream();
00468          //CloseStream() makes this redundant: fOutputStreamManager->CloseFile();
00469          
00470          CloseAndDeleteSocket(fSocket);
00471          fCurrentState = MINOS_ROOTER_STATE_UNCONNECTED;
00472          continue;
00473       }
00474       ncmd++;
00475 
00476       if (fgDebugFlags & dbg_ReportCmd) 
00477          logDebug(3,"msg %d status %d",ncmd,status);
00478 
00479       //if (fgDebugFlags & dbg_ReportCmd) 
00480       //   MSG("Roto", Msg::kInfo) 
00481       //      << "RotoServer::Run msg#" 
00482       //      << ncmd << " status=" << status
00483       //      << " " << cmd.AsStlString(reportOpt)
00484       //      << endl;
00485 
00486       status |= CheckTo(cmd);
00487       if (status & kFailNotToMe) continue;
00488 
00489       status |= ProcessCommand(fSocket,cmd);
00490 
00491       ReplyToCommand(fSocket,cmd,status);
00492 
00493       if (fCurrentState == MINOS_ROOTER_STATE_DISCONNECTING) {
00494          CloseAndDeleteSocket(fSocket);
00495          fCurrentState = MINOS_ROOTER_STATE_UNCONNECTED;
00496       }
00497    }
00498 }
00499 
00500 //_____________________________________________________________________________
00501 void RotoServer::ResizeBuffer(TArray& buffer, Int_t size, const Char_t* name) 
00502 {
00503 //
00504 //  Purpose:  Attempt to resize receive/reply buffer size
00505 //
00506 //  Argument: buffer   TArray to be resized
00507 //            size     desire size in nominal units
00508 //            name     name for log messages
00509 //
00510 //  Return:   (none)
00511 //
00512 //  Contact:  R. Hatcher
00513 //
00514 
00515    int oldsize = buffer.GetSize();
00516    buffer.Set(0);  // toss old array so we don't copy contents
00517 
00518    Int_t request = size;
00519    Int_t failures = 0;
00520    while ( buffer.GetSize() != request ) {
00521       buffer.Set(request);
00522       if (buffer.GetSize() == request) break; // success!
00523       failures++;
00524       request -= 1024*sizeof(Int_t);  // failed, try smaller size
00525    }
00526 
00527    if (failures) {
00528          logError("resize %s buffer from %d to %d only got %d",
00529                   name,oldsize,size,request);
00530    } else {
00531          logNotice("resize %s buffer from %d to %d",
00532                    name,oldsize,size);
00533    }
00534 }
00535 
00536 //_____________________________________________________________________________
00537 Int_t RotoServer::RecvCommand(TSocket* socket, RotoRcCmd& command)
00538 {
00539 //
00540 //  Purpose:  Receive RunControl-protocol-like commands from socket
00541 //
00542 //  Argument: socket   ptr to socket on which command is to be read
00543 //            command  RunControl encoded command
00544 //
00545 //  Return:   kSuccess (0)     = success
00546 //            kFailRecvCmd     = error retrieving 4-byte command
00547 //            kFailRecvByteCnt = error retrieving followup byte count
00548 //            kFailRecvData    = error retrieving followup data
00549 //            kFailBuffResize  = could not resize buffer to hold entire record
00550 //                               RecvBuffer holds only partial record
00551 //
00552 //  Contact:  R. Hatcher
00553 //
00554    Int_t  lCommand, ltouse, lBuffer;
00555 
00556    lCommand = fRecvBufferUsed = ltouse = lBuffer = 0;
00557 
00558    //
00559    // Retrieve "command" (4bytes of packed info)
00560    //
00561    lCommand = socket->RecvRaw(&command.fEncoded,sizeof(Int_t));
00562    if (lCommand != sizeof(Int_t)) {
00563       logError("RecvRaw command got %d expect %d",
00564                lCommand,sizeof(Int_t));
00565       return kFailRecvCmd;
00566    }
00567    //
00568    // if DATATOFOLLOW bit is set get the size and the data
00569    //
00570    if (command.HasDataToFollow()) {
00571       // byte count includes its own 4 bytes
00572       Int_t extra_bytes; 
00573       ltouse = socket->RecvRaw(&extra_bytes,sizeof(Int_t));
00574       fRecvBufferUsed = extra_bytes - 4;
00575       if (ltouse != sizeof(Int_t)) {
00576       logError("RecvRaw nbytes got %d expect %d",
00577                ltouse,sizeof(Int_t));
00578          return kFailRecvByteCnt;
00579       }
00580       if (fRecvBufferUsed>0) {
00581          // there be data in dat' d'er socket
00582 
00583          // if the buffer isn't big enough resize it (up only)
00584          if (fRecvBufferUsed>fRecvBuffer.GetSize()) 
00585             ResizeBuffer(fRecvBuffer,fRecvBufferUsed,"Recv");
00586 
00587          Bool_t partial = false;
00588          if (fRecvBufferUsed>fRecvBuffer.GetSize()) {
00589             // couldn't resize buffer large enough
00590             fNackMessage += 
00591                Form("Buffer could only hold %d of %d;",
00592                     fRecvBuffer.GetSize(),fRecvBufferUsed);
00593             partial = true;
00594             fRecvBufferUsed = fRecvBuffer.GetSize();
00595          }
00596 
00597          lBuffer = socket->RecvRaw(fRecvBuffer.GetArray(),fRecvBufferUsed);
00598          if (lBuffer != fRecvBufferUsed) {
00599             logError("RecvRaw buffer got %d expect %d",
00600                      lBuffer,fRecvBufferUsed);
00601             fNackMessage += 
00602                Form("Buffer expected %d, saw %d;",
00603                     fRecvBufferUsed,lBuffer);
00604             return kFailRecvData;
00605          }
00606          if (partial) return kFailBuffResize;
00607 
00608       }
00609    }
00610 
00611    return kSuccess;
00612 }
00613    
00614 //_____________________________________________________________________________
00615 Int_t RotoServer::CheckTo(RotoRcCmd& command) 
00616 {
00617 //
00618 //  Purpose:  Check if this command was directed to the right place
00619 //
00620 //  Argument: command   Run Control command
00621 //
00622 //  Return:   kSuccess      if okay
00623 //            kFailNotToMe  if not intended for RotoRooter
00624 //
00625 //  Contact:  R. Hatcher
00626 //
00627 
00628    int to = command.GetTo();
00629    if (to == MINOS_ROOTER_ROOTER) return kSuccess;
00630 
00631    logError("I am 0x%2.2x, got message for 0x%2.2x (cmd 0x%8.8x)",
00632             MINOS_ROOTER_ROOTER,to,command.fEncoded);
00633 
00634    // if (fgDebugFlags & dbg_ReportNotTo) 
00635    //   MSG("Roto", Msg::kInfo) 
00636    //      << "RotoServer::Run received message meant for 0x" 
00637    //      << hex << to << dec
00638    //      << ",  I am 0x" 
00639    //      << hex << MINOS_ROOTER_ROOTER << dec 
00640    //      << endl;
00641 
00642    return kFailNotToMe;
00643 }
00644 //_____________________________________________________________________________
00645 Int_t RotoServer::ProcessCommand(TSocket* socket, RotoRcCmd command)
00646 {
00647 //
00648 //  Purpose:  Process the command received on socket
00649 //
00650 //  Argument: socket      ptr to socket on which command is to be read
00651 //            command     RunControl encoded command
00652 //
00653 //  Return:   kSuccess (0)  if successful (yields ACK)
00654 //            various       if there was a problem (yields UNABLE_TO)
00655 //
00656 //  Contact:  R. Hatcher
00657 //
00658    int from  = command.GetFrom();
00659    int type  = command.GetType();
00660    int instr = command.GetInstr();
00661 
00662    switch (type) {
00663    case MINOS_ROOTER_COMMAND:
00664       if (MINOS_ROOTER_DCP     != from && 
00665           MINOS_ROOTER_DCS     != from && 
00666           MINOS_ROOTER_BEAMMON != from ) {
00667          logError("MinosEntity 0x%2.2x on %s attempted COMMAND",
00668                   from,socket->GetInetAddress().GetHostName());
00669 
00670          fNackMessage += 
00671             Form("Not from DCP, DCS or BEAMMON but %s (0x%x);",
00672                  RotoRcCmd::ElementAsStlString(from).c_str(),from);
00673          return kFailCmdBadFrom;
00674          break;
00675       }         
00676       switch (instr) {
00677       case MINOS_ROOTER_OPENSOCKET: 
00678          return kSuccess;
00679          break;
00680       case MINOS_ROOTER_CLOSESOCKET: 
00681          // we can't do this immediately here because we need to reply first
00682          fCurrentState = MINOS_ROOTER_STATE_DISCONNECTING;
00683          return kSuccess;
00684          break;
00685       case MINOS_ROOTER_OPENFILE: 
00686          return OpenFile();
00687          break;
00688       case MINOS_ROOTER_CLOSEFILE: 
00689          return CloseFile();
00690          break;
00691       case MINOS_ROOTER_RECBUFFER: 
00692          return ProcessBuffer();
00693          break;
00694       case MINOS_ROOTER_SHUTDOWN: 
00695          logInfo("MinosEntity 0x%2.2x on %s requested shutdown",
00696                  from,socket->GetInetAddress().GetHostName());
00697          // MSG("Roto",Msg::kInfo)
00698          //    << " Roto SHUTDOWN command " << endl;
00699          fCurrentState = MINOS_ROOTER_STATE_SHUTDOWN_REQ;
00700          return kSuccess;
00701          break;
00702       case MINOS_ROOTER_CONFIG:
00703          return ProcessConfig();
00704          break;
00705       default:
00706          logError("MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00707                   from,socket->GetInetAddress().GetHostName(),
00708                   command.fEncoded);
00709 
00710          fNackMessage += 
00711             Form("Unmodelled %s %s;",
00712                  RotoRcCmd::TypeAsStlString(type).c_str(),
00713                  RotoRcCmd::InstrAsStlString(type,instr).c_str());
00714          return kFailCmdUnknown;
00715          break;
00716       }
00717       break;
00718    case MINOS_ROOTER_REQUEST:
00719       switch (instr) {
00720       case MINOS_ROOTER_REQ_CURRENT_STATE: 
00721          return kSuccess;
00722          break;
00723       case MINOS_ROOTER_REQ_STATUS_REPORT: 
00724          BuildStateReport();
00725          return kSuccess;
00726          break;
00727       default:
00728          logError("MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00729                 from,socket->GetInetAddress().GetHostName(),
00730                 command.fEncoded);
00731 
00732          fNackMessage += 
00733             Form("Unmodelled %s %s;",
00734                  RotoRcCmd::TypeAsStlString(type).c_str(),
00735                  RotoRcCmd::InstrAsStlString(type,instr).c_str());
00736          return kFailReqUnknown;
00737          break;
00738       }
00739       break;
00740    default:
00741          logError("MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00742                   from,socket->GetInetAddress().GetHostName(),
00743                   command.fEncoded);
00744 
00745          fNackMessage += 
00746             Form("Unmodelled %s %s;",
00747                  RotoRcCmd::TypeAsStlString(type).c_str(),
00748                  RotoRcCmd::InstrAsStlString(type,instr).c_str());
00749       return kFailBadRcType;
00750       break;
00751    }
00752    return kFailWhoops;
00753 }
00754 
00755 //_____________________________________________________________________________
00756 void RotoServer::ReplyToCommand(TSocket* socket, RotoRcCmd command,
00757                                 Int_t status)
00758 {
00759 //
00760 //  Purpose:  Reply to the command received on socket
00761 //            Special commands send more than just ack/nack
00762 //
00763 //  Argument: socket   ptr to socket on which command is to be read
00764 //            command  RunControl encoded command
00765 //            ack      whether to send ACKNOWLEDGE or UNABLE_TO
00766 //
00767 //  Return:   (none)
00768 //
00769 //  Contact:  R. Hatcher
00770 //
00771    static Int_t nreply = 0;
00772    nreply++;
00773 
00774    Int_t type  = command.GetType();
00775    Int_t instr = command.GetInstr();
00776    Bool_t ack  = ( kSuccess == status );
00777 
00778    RotoRcCmd reply = command;
00779    reply.SwapFromTo();
00780 
00781    switch (type) {
00782    case MINOS_ROOTER_COMMAND: {
00783       //
00784       // send ACK/UNABLE_TO for COMMAND
00785       //
00786 
00787       // construct the entirety of what we're sending first
00788       // rather than sending the individual bits
00789       // combine ACK with STATE_REPORT into single socket->SendRaw
00790 
00791       Int_t  ackfield   = 
00792          (ack) ? MINOS_ROOTER_ACKNOWLEDGE : MINOS_ROOTER_UNABLE_TO;
00793       Bool_t nackmsg    = ( ! ack && fNackMessage.length()>0);
00794 
00795       int totmsgsize = sizeof(Int_t);
00796       if (nackmsg) totmsgsize += sizeof(Int_t) + fNackMessage.length();
00797       totmsgsize += sizeof(Int_t);
00798       if (totmsgsize>fReplyBuffer.GetSize())
00799          ResizeBuffer(fReplyBuffer,totmsgsize,"Reply");
00800       Char_t* p = fReplyBuffer.GetArray();
00801 
00802       reply.SetType(ackfield);
00803       reply.SetDataToFollow(nackmsg);
00804 //      socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00805       memcpy(p,&reply.fEncoded,sizeof(Int_t));
00806       p += sizeof(Int_t);
00807 
00808       if (!ack) {
00809          logError("sent NACK 0x%8.8x to MinosEntity 0x%2.2x on %s",
00810                   reply.fEncoded,reply.GetTo(),
00811                   socket->GetInetAddress().GetHostName());
00812          logError(reply.AsStlString().c_str());
00813          if (fNackMessage.length()>0) 
00814             logError("  %s",fNackMessage.c_str());
00815       }
00816 #ifdef VERBOSE_SYSLOG
00817       else {
00818 //rwh: very verbose
00819          logInfo("ACK to %s",command.AsStlString().c_str());
00820       }
00821 #endif
00822 
00823       if (nackmsg) {
00824          // byte count includes its own 4 bytes
00825          Int_t nackmsg_length = fNackMessage.length() + 4;
00826 //         socket->SendRaw(&nackmsg_length,sizeof(Int_t));
00827 //         socket->SendRaw(fNackMessage.c_str(),nackmsg_length-4);
00828          memcpy(p,&nackmsg_length,sizeof(Int_t));
00829          p += sizeof(Int_t);
00830          memcpy(p,fNackMessage.c_str(),nackmsg_length-4);
00831          p += nackmsg_length-4;
00832       }
00833       //
00834       // protocol says COMMAND also gets a STATE_REPORT reply
00835       //
00836       reply.SetType(MINOS_ROOTER_STATE_REPORT);
00837       reply.SetInstr(fCurrentState);
00838       reply.SetDataToFollow(false);
00839 //      socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00840       memcpy(p,&reply.fEncoded,sizeof(Int_t));
00841       p += sizeof(Int_t);
00842       socket->SendRaw(fReplyBuffer.GetArray(),totmsgsize);                 
00843         
00844       //if (fgDebugFlags & dbg_ReportReply) {
00845       //   string status_msg = "";
00846       //   if (status != 0) status_msg += Form("\n  status 0x%x",status);
00847       //   MSG("Roto", Msg::kInfo) 
00848       //      << "RotoServer::ReplyToCommand reply#" 
00849       //      << nreply 
00850       //      << " " << reply.AsStlString(reportOpt)
00851       //      << status_msg
00852       //      << endl;
00853 
00854       break;
00855    }
00856    case MINOS_ROOTER_REQUEST:
00857       switch (instr) {
00858       case MINOS_ROOTER_REQ_CURRENT_STATE:
00859          reply.SetType(MINOS_ROOTER_STATE_REPORT);
00860          reply.SetInstr(fCurrentState);
00861          reply.SetDataToFollow(false);
00862          socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00863          break;
00864       case MINOS_ROOTER_REQ_STATUS_REPORT: {
00865          reply.SetType(MINOS_ROOTER_REPORT);
00866          reply.SetInstr(MINOS_ROOTER_REQ_STATUS_REPORT);
00867          reply.SetDataToFollow(true);
00868          socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00869          // byte count includes its own 4 bytes
00870          Int_t report_length = fStateReport.length() + 4;
00871          socket->SendRaw(&report_length,sizeof(Int_t));
00872          socket->SendRaw(fStateReport.c_str(),report_length-4);
00873 
00874          //if (fgDebugFlags & dbg_ReportReply) 
00875          //   MSG("Roto", Msg::kInfo) 
00876          //      << "RotoServer::ReplyToCommand reply#" 
00877          //      << nreply 
00878          //      << " " << reply.AsStlString(reportOpt)
00879          //      << endl
00880          //      << "  \"" << fStateReport << "\" (" << report_length << ")"
00881          //      << endl;
00882          break;
00883       }
00884 
00885       default:
00886          break;
00887       }
00888       break;
00889    default:
00890       break;
00891    }      
00892           
00893 }
00894 
00895 //_____________________________________________________________________________
00896 Int_t RotoServer::OpenFile()
00897 {
00898 //
00899 //  Purpose:  Open a file, use buffer to determine parameters
00900 //
00901 //  Argument: (none)
00902 //
00903 //  Return:   if 0 then file creation was okay
00904 //
00905 //  Contact:  R. Hatcher
00906 //   
00907    int    nwords = fRecvBufferUsed/sizeof(Int_t);
00908    if (nwords<1) {
00909       logError("OpenFile() too few words (%d)",nwords);
00910       return kFailOpenFile;
00911    }
00912    Int_t* ibuffer = (Int_t*) fRecvBuffer.GetArray();
00913    Int_t from = ibuffer[0];
00914 
00915    if (fFakeDAQFileName != "") {
00916       fNackMessage += "OpenFile not possible, already in FakeDaqFile mode ";
00917       fNackMessage += ";";
00918       logNotice("OpenFile() already in fake mode");
00919       return kFailCmdBadFrom;
00920    }
00921 
00922    switch (from) {
00923    case MINOS_ROOTER_DCP:
00924       if (nwords<4) {
00925          logError("OpenFile() from DCP too few words (%d)",nwords);
00926          return kFailOpenFile;
00927       }
00928       return OpenDaqFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00929       break;
00930    case MINOS_ROOTER_DCS:
00931       if (nwords<4) {
00932          logError("OpenFile() from DCS too few words (%d)",nwords);
00933          return kFailOpenFile;
00934       }
00935       return OpenDcsFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00936       break;
00937    case MINOS_ROOTER_BEAMMON:
00938       if (nwords<4) {
00939          logError("OpenFile() from BEAMMON too few words (%d)",nwords);
00940          return kFailOpenFile;
00941       }
00942       return OpenBeamMonFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00943       break;
00944    default: {
00945       // a fake "from" in the buffer is a hack to support
00946       // binary files without proper header blocks
00947       // buffer should be 4bytes of "fake from" + the name
00948       EFileType ftype = kDaqFile;
00949       const char* fname = fRecvBuffer.GetArray()+sizeof(Int_t);
00950       Int_t err = OpenFile(fname,ftype);
00951       if (!err) fFakeDAQFileName = fname;
00952       return err;
00953       break;
00954       }
00955    }
00956    fNackMessage += "OpenFile not possible from ";
00957    fNackMessage += RotoRcCmd::ElementAsStlString(from);
00958    fNackMessage += ";";
00959    return kFailCmdBadFrom;
00960 }
00961 
00962 //_____________________________________________________________________________
00963 
00964 Int_t RotoServer::CloseFile()
00965 {
00966 //
00967 //  Purpose:  Close a file, use buffer to determine parameters
00968 //
00969 //  Argument: (none)
00970 //
00971 //  Return:   if 0 then file closure was okay
00972 //
00973 //  Contact:  R. Hatcher
00974 //   
00975    int    nwords = fRecvBufferUsed/sizeof(Int_t);
00976    if (nwords<1) {
00977       logError("CloseFile() too few words (%d)",nwords);
00978       return kFailOpenFile;
00979    }
00980    Int_t* ibuffer = (Int_t*) fRecvBuffer.GetArray();
00981    Int_t from = ibuffer[0];
00982 
00983    switch (from) {
00984    case MINOS_ROOTER_DCP:
00985       if (fFakeDAQFileName != "") {
00986          fNackMessage += "CloseFile not possible, in FakeDaqFile mode ";
00987          fNackMessage += ";";
00988          return kFailCmdBadFrom;
00989       }
00990       if (nwords<4) {
00991          logError("CloseFile() from DCP too few words (%d)",nwords);
00992          return kFailOpenFile;
00993       }
00994       return CloseDaqFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00995       break;
00996    case MINOS_ROOTER_DCS:
00997       if (fFakeDAQFileName != "") {
00998          fNackMessage += "CloseFile not possible, in FakeDaqFile mode ";
00999          fNackMessage += ";";
01000          return kFailCmdBadFrom;
01001       }
01002       if (nwords<4) {
01003          logError("CloseFile() from DCS too few words (%d)",nwords);
01004          return kFailOpenFile;
01005       }
01006       return CloseDcsFile(ibuffer[1],ibuffer[2],ibuffer[3]);
01007       break;
01008    case MINOS_ROOTER_BEAMMON:
01009       if (fFakeDAQFileName != "") {
01010          fNackMessage += "CloseFile not possible, in FakeDaqFile mode ";
01011          fNackMessage += ";";
01012          return kFailCmdBadFrom;
01013       }
01014       if (nwords<4) {
01015          logError("CloseFile() from BEAMMON too few words (%d)",nwords);
01016          return kFailOpenFile;
01017       }
01018       return CloseBeamMonFile(ibuffer[1],ibuffer[2],ibuffer[3]);
01019       break;
01020    default: {
01021       // a fake "from" in the buffer is a hack to support
01022       // binary files without proper header blocks
01023       // buffer should be 4bytes of "fake from" + the name
01024       EFileType ftype = kDaqFile;
01025       const char* fname = fRecvBuffer.GetArray()+sizeof(Int_t);
01026       Int_t err = CloseFile(fname,ftype);
01027       if (!err) fFakeDAQFileName = "";
01028       return err;
01029       break;
01030       }
01031    }
01032    fNackMessage += "CloseFile not possible from ";
01033    fNackMessage += RotoRcCmd::ElementAsStlString(from);
01034    fNackMessage += ";";
01035    return kFailCmdBadFrom;
01036 }
01037 
01038 //_____________________________________________________________________________
01039 string RotoServer::FileExtName(EFileType ftype)
01040 {
01041 //
01042 //  Purpose:  return standardized file extension names
01043 //
01044 //  Argument: ftype      enum of file types
01045 //
01046 //  Return:   string version of standard name
01047 //
01048 //  Contact:  R. Hatcher
01049 //   
01050    switch (ftype) {
01051    case kDaqFile:        return string("mdaq.root");    break;
01052    case kDcsFile:        return string("mdcs.root");    break;
01053    case kBeamMonFile:    return string("mbeam.root");   break;
01054    case kBogusFile:      return string("mall.root");    break;
01055    default:
01056       logError("bad FileExtName request %d",(int)ftype);
01057       //MSG("Roto", Msg::kInfo) 
01058       //   << "RotoServer::StreamName illegal enum value: " 
01059       //   << (int)stype << endl;      
01060       return string("generic.root");
01061       break;
01062    }
01063 
01064 }
01065 
01066 //_____________________________________________________________________________
01067 string RotoServer::BogusFileName()
01068 {
01069 //
01070 //  Purpose:  return bogus file name
01071 //
01072 //  Argument: (none)
01073 //
01074 //  Return:   string of a file name in which to put records
01075 //            who's real destination couldn't be determined
01076 //            tag it with the machine, port, and current date/time
01077 //
01078 //  Contact:  R. Hatcher
01079 //   
01080    Int_t date = fStartTime.GetDate();
01081    if (date>19700000) date %= 1000000;
01082    Int_t time = fStartTime.GetTime();
01083    string fname = Form("BOGUS_%s_p%d_%6.6d_%6.6d",
01084                        gSystem->HostName(),fPort,date,time);
01085    // squeeze out any blanks with "_"
01086    string::size_type where;
01087    while (string::npos != (where = fname.find(" "))) {
01088       fname.replace(where,1,"_");
01089    }
01090    return fname;
01091 }
01092 
01093 //_____________________________________________________________________________
01094 Int_t RotoServer::OpenDaqFile(Int_t detector, Int_t run, Int_t subrun)
01095 {
01096 //
01097 //  Purpose:  Open a DAQ file
01098 //
01099 //  Argument: detector   detector type (near,far,caldet)
01100 //            run        run # 
01101 //            subrun     subrun #
01102 //
01103 //  Return:   if 0 then file creation was okay
01104 //
01105 //  Contact:  R. Hatcher
01106 //   
01107 
01108    string filebase = BuildDaqBaseName(detector,run,subrun);
01109    return OpenFile(filebase.c_str(),kDaqFile);
01110 }
01111 
01112 //_____________________________________________________________________________
01113 Int_t RotoServer::OpenDcsFile(Int_t detector, Int_t sec, Int_t nanosec)
01114 {
01115 //
01116 //  Purpose:  Open the DCS file
01117 //
01118 //  Argument: detector   detector type (near,far,caldet)
01119 //            sec        seconds when file was started
01120 //            nanosec    nano when file was started
01121 //
01122 //  Return:   if 0 then file creation was okay
01123 //
01124 //  Contact:  R. Hatcher
01125 //   
01126 
01127    string filebase = BuildDcsBaseName(detector,sec,nanosec);
01128    return OpenFile(filebase.c_str(),kDcsFile);
01129 }
01130 
01131 //_____________________________________________________________________________
01132 Int_t RotoServer::OpenBeamMonFile(Int_t detector, Int_t sec, Int_t nanosec)
01133 {
01134 //
01135 //  Purpose:  Open the BEAMMON file
01136 //
01137 //  Argument: detector   detector type (near,far,caldet)
01138 //            sec        seconds when file was started
01139 //            nanosec    nano when file was started
01140 //
01141 //  Return:   if 0 then file creation was okay
01142 //
01143 //  Contact:  R. Hatcher
01144 //   
01145 
01146    string filebase = BuildBeamMonBaseName(detector,sec,nanosec);
01147    return OpenFile(filebase.c_str(),kBeamMonFile);
01148 }
01149 
01150 //_____________________________________________________________________________
01151 Int_t RotoServer::OpenFile(const Char_t* fnamebase, EFileType ftype)
01152 {
01153 //
01154 //  Purpose:  Open a DAQ/DCS/BEAMMON file
01155 //
01156 //  Argument: fnamebase  file name without extension
01157 //            ftype      kDaqFile, kDcsFile, kBeamMonFile or kBogusFile
01158 //
01159 //  Return:   if 0 then file creation was okay
01160 //
01161 //  Contact:  R. Hatcher
01162 // 
01163 
01164    const char* pbasedir = gSystem->Getenv("DAQ_DATA_DIR");
01165    if (kDcsFile == ftype) {
01166      // if DCS file and DCS_DATA_DIR is defined use that
01167      const char* pdcsdir = gSystem->Getenv("DCS_DATA_DIR");
01168      if (pdcsdir) pbasedir = pdcsdir;
01169    }
01170    if (kBeamMonFile == ftype) {
01171      // if Beam Monitor file and BEAM_DATA_DIR is defined use that
01172      const char* dir = gSystem->Getenv("BEAM_DATA_DIR");
01173      if (dir) pbasedir = dir;
01174    }
01175    // if no base dir use current working directory
01176    if (!pbasedir) pbasedir = ".";
01177    string filename = string(pbasedir)  + "/" + 
01178                      string(fnamebase) + "." + 
01179                      FileExtName(ftype);
01180    logInfo("OpenFile '%s'",filename.c_str());
01181 
01182    bool openok = true;
01183 
01184    // create possible streams, attach each to the file
01185    // !!!!!! needs more protection against failures
01186 
01187    int ibeg = Per::kDaqSnarl;
01188    int iend = Per::kDcsMonitor;
01189    if (kDaqFile == ftype) {
01190       ibeg = Per::kDaqSnarl;
01191       iend = Per::kLightInjection;
01192    }
01193    else
01194    if (kDcsFile == ftype) {
01195       ibeg = Per::kDcsAlarm;
01196       iend = Per::kDcsMonitor;
01197    }
01198    else
01199    if (kBeamMonFile == ftype) {
01200       ibeg = Per::kBeamMon;
01201       iend = Per::kBeamMon;
01202    }
01203 
01204    string fbase = fnamebase;
01205    // if the file is a dcs, beammon or bogus one, then the stream names
01206    // only have "dcs" or "bogus" and not the full file name
01207    // as there can be only one of each
01208    switch (ftype) {
01209    case kDcsFile:     fbase = "dcs";   break;
01210    case kBeamMonFile: fbase = "beam";  break;
01211    case kBogusFile:   fbase = "bogus"; break;
01212    default:
01213      // do nothing
01214      break;
01215    }
01216    int fversion = 0;
01217 
01218    // if running from RotoSocket (ie. connected directly to input
01219    // binary file) then don't bother with AutoSave   
01220    bool fromFile = fSocket->InheritsFrom("RotoSocket");
01221    if (fromFile) MSG("Roto",Msg::kInfo)
01222          << "Reading from RotoSocket - turn off autosave"
01223          << endl;
01224 
01225    for (int i = ibeg; i<=iend; i++ ) {
01226       Per::EStreamType stype = (Per::EStreamType) i;
01227       string treeName = Per::AsString(stype);
01228       string rotoStrmName = fbase + "." + treeName;
01229 
01230       Int_t  compressLevel = fCompressionMap[stype];
01231       UInt_t basketSize    = fBasketSizeMap[stype];
01232 
01233       PerOutputStream* astream =
01234          fOutputStreamManager->OpenStream(rotoStrmName,treeName,
01235                                           "RawRecord","","",Per::kRecSplit,
01236                                           basketSize,compressLevel);
01237       if (astream) {
01238          // possible tweaks
01239          //   autosave (interval: 0, setbasket = true)
01240          //   basketsaveint (interval: 0)
01241          if (fromFile) astream->SetAutoSave(0,0);
01242          else {
01243             UInt_t autoSaveInt  = fAutoSaveIntMap[stype];
01244             UInt_t autoSaveTime = fAutoSaveTimeMap[stype];
01245             astream->SetAutoSave(autoSaveInt,autoSaveTime);
01246          }
01247          bool openok_astream = 
01248             fOutputStreamManager->SetFile(rotoStrmName,filename,fAccessMode);
01249 
01250          // handle potential case where file already exists
01251          // and the rotorooter is in kNew not kRecreate mode
01252          Per::EErrorCode ecode = astream->GetErrorCode();
01253          if (Per::kErrFileExists == ecode) {
01254             string basefilename = filename;
01255             string::size_type where = filename.rfind(".m");
01256             if (string::npos == where) where = filename.rfind(".root");
01257             if (string::npos == where) where = filename.length();
01258             bool keep_trying = true;
01259             // start adding version numbers
01260             do {
01261                ++fversion;
01262                filename = basefilename;  // without any attempt at versioning
01263                filename.insert(where,Form(".%d",fversion));
01264                openok_astream = 
01265                   fOutputStreamManager->SetFile(rotoStrmName,filename,fAccessMode);
01266                if (Per::kErrSuccess != ecode) {
01267                   logWarn("open versioned file saw Per error: (%d) %s",
01268                           (int)ecode,Per::AsString(ecode));
01269                }
01270 
01271                ecode = astream->GetErrorCode();
01272                keep_trying = (fversion<100) &&
01273                   (Per::kErrFileExists == ecode);
01274             } while (keep_trying);
01275             // report if there was success
01276             if (Per::kErrSuccess == ecode) 
01277                logInfo("open stream using file %s",filename.c_str());
01278 
01279          } // file exists error
01280 
01281          if (Per::kErrSuccess != ecode) {
01282             logWarn("open stream saw Per error: (%d) %s",
01283                     (int)ecode,Per::AsString(ecode));
01284          }
01285 
01286          openok = openok && openok_astream;
01287          
01288          if (openok_astream) 
01289             logInfo("open stream %s as %s",
01290                     treeName.c_str(),rotoStrmName.c_str());
01291 
01292          //MSG("Roto", Msg::kDebug) 
01293          //   << "RotoServer::OpenDaqFile stream " << strmName
01294          //   << " opened " << ( (openok_astream) ? "ok" : "failed" )
01295          //   << endl;
01296       }
01297       else {
01298          openok = false;
01299       }
01300    }
01301 
01302    if (openok && fSymlink.size()) {
01303        gSystem->Unlink(fSymlink.c_str());
01304        gSystem->Symlink(filename.c_str(),fSymlink.c_str());
01305    }
01306 
01307    return (openok) ? kSuccess : kFailWhoops;
01308 }
01309 
01310 //_____________________________________________________________________________
01311 Int_t RotoServer::CloseDaqFile(Int_t detector, Int_t run, Int_t subrun)
01312 {
01313 //
01314 //  Purpose:  Close a DAQ file
01315 //
01316 //  Argument: detector   detector type (near,far,caldet)
01317 //            run        run # 
01318 //            subrun     subrun #
01319 //
01320 //  Return:   if >0 then file closure was okay
01321 //
01322 //  Contact:  R. Hatcher
01323 //   
01324 
01325    string filebase = BuildDaqBaseName(detector,run,subrun);
01326    return CloseFile(filebase.c_str(),kDaqFile);
01327 }
01328 
01329 //_____________________________________________________________________________
01330 Int_t RotoServer::CloseDcsFile(Int_t detector, Int_t sec, Int_t nanosec)
01331 {
01332 //
01333 //  Purpose:  Close the DCS file
01334 //
01335 //  Argument: detector   detector type (near,far,caldet)
01336 //            sec        seconds when file was started
01337 //            nanosec    nano when file was started
01338 //
01339 //  Return:   if 0 then file closure as okay
01340 //
01341 //  Contact:  R. Hatcher
01342 //   
01343 
01344    string filebase = BuildDcsBaseName(detector,sec,nanosec);
01345    return CloseFile(filebase.c_str(),kDcsFile);
01346 }
01347 
01348 //_____________________________________________________________________________
01349 Int_t RotoServer::CloseBeamMonFile(Int_t detector, Int_t sec, Int_t nanosec)
01350 {
01351 //
01352 //  Purpose:  Close the BEAMMON file
01353 //
01354 //  Argument: detector   detector type (near,far,caldet)
01355 //            sec        seconds when file was started
01356 //            nanosec    nano when file was started
01357 //
01358 //  Return:   if 0 then file closure as okay
01359 //
01360 //  Contact:  R. Hatcher
01361 //   
01362 
01363    string filebase = BuildBeamMonBaseName(detector,sec,nanosec);
01364    return CloseFile(filebase.c_str(),kBeamMonFile);
01365 }
01366 
01367 //_____________________________________________________________________________
01368 Int_t RotoServer::CloseFile(const Char_t* fnamebase, EFileType ftype)
01369 {
01370 //
01371 //  Purpose:  Close a DAQ/DCS/BEAMMON file
01372 //
01373 //  Argument: fnamebase  file name without extension
01374 //            ftype     kDaqFile, kDcsFile, kBeamMonFile or kBogusFile
01375 //
01376 //  Return:   if >0 then file closure was okay
01377 //
01378 //  Contact:  R. Hatcher
01379 //   
01380    const char* pbasedir = gSystem->Getenv("DAQ_DATA_DIR");
01381    if (kDcsFile == ftype) {
01382      // if DCS file and DCS_DATA_DIR is defined use that
01383      const char* pdcsdir = gSystem->Getenv("DCS_DATA_DIR");
01384      if (pdcsdir) pbasedir = pdcsdir;
01385    }
01386    // if no base dir use current working directory
01387    if (!pbasedir) pbasedir = ".";
01388    string filename = string(pbasedir)  + "/" + 
01389                      string(fnamebase) + "." + 
01390                      FileExtName(ftype);
01391    logInfo("CloseFile '%s'",filename.c_str());
01392 
01393    bool closeok = true;
01394 
01395    // close possible streams
01396    // !!!!!! needs more protection against failures
01397 
01398    int ibeg = Per::kDaqSnarl;
01399    int iend = Per::kDcsMonitor;
01400    if (kDaqFile == ftype) {
01401       ibeg = Per::kDaqSnarl;
01402       iend = Per::kLightInjection;
01403    }
01404    else
01405    if (kDcsFile == ftype) {
01406       ibeg = Per::kDcsAlarm;
01407       iend = Per::kDcsMonitor;
01408    }
01409    else
01410    if (kBeamMonFile == ftype) {
01411       ibeg = Per::kBeamMon;
01412       iend = Per::kBeamMon;
01413    }
01414 
01415    string fbase = fnamebase;
01416    // if the file is a dcs or bogus one, then the stream names
01417    // only have "dcs" or "bogus" and not the full file name
01418    // as there can be only one of each
01419    switch (ftype) {
01420    case kDcsFile:     fbase = "dcs";   break;
01421    case kBeamMonFile: fbase = "beam";  break;
01422    case kBogusFile:   fbase = "bogus"; break;
01423    default:
01424      // do nothing
01425      break;
01426    }
01427 
01428    for (int i = ibeg; i<=iend; i++ ) {
01429       Per::EStreamType stype = (Per::EStreamType) i;
01430       string treeName = Per::AsString(stype);
01431       string rotoStrmName = fbase + "." + treeName;
01432 
01433       fOutputStreamManager->Write(rotoStrmName);
01434       fOutputStreamManager->CloseStream(rotoStrmName);
01435    }
01436    // CloseStream does this for us (besides this method takes a stream name)
01437    // fOutputStreamManager->CloseFile(filename);
01438                                                   
01439    return (closeok) ? kSuccess : kFailWhoops;
01440 }
01441 
01442 //_____________________________________________________________________________
01443 string RotoServer::BuildDaqBaseName(Int_t detector, Int_t run, Int_t subrun)
01444 {
01445 //
01446 //  Purpose:  Construct a standardized DAQ file/stream name base
01447 //
01448 //  Argument: detector   detector type (near,far,caldet)
01449 //            run        run # 
01450 //            subrun     subrun #
01451 //
01452 //  Return:   the base file/stream name
01453 //
01454 //  Contact:  R. Hatcher
01455 //   
01456 
01457    Detector::Detector_t det = (Detector::Detector_t)(detector);
01458    Char_t detchar = Detector::AsString(det)[0];
01459 
01460    string fname = Form("%c%8.8d_%4.4d",detchar,run,subrun);
01461    return fname;
01462 }
01463 
01464 //_____________________________________________________________________________
01465 string RotoServer::BuildDcsBaseName(Int_t detector, Int_t sec, Int_t nanosec)
01466 {
01467 //
01468 //  Purpose:  Construct a standardized DCS file/stream name
01469 //
01470 //  Argument: detector   detector type (near,far,caldet)
01471 //            sec        seconds of timestamp
01472 //            nanosec    nanoseconds
01473 //
01474 //  Return:   the base file/stream name
01475 //
01476 //  Contact:  R. Hatcher
01477 //   
01478 
01479    Detector::Detector_t det = (Detector::Detector_t)(detector);
01480    Char_t detchar = Detector::AsString(det)[0];
01481    VldTimeStamp starttime(sec,nanosec);
01482    Int_t date = starttime.GetDate();
01483    if (date>19700000) date %= 1000000;
01484    Int_t time = starttime.GetTime();
01485 
01486    string fname = Form("%c%6.6d_%6.6d",detchar,date,time);
01487    return fname;
01488 }
01489 
01490 //_____________________________________________________________________________
01491 string RotoServer::BuildBeamMonBaseName(Int_t /* detector */, 
01492                                         Int_t sec, Int_t nanosec)
01493 {
01494 //
01495 //  Purpose:  Construct a standardized BEAMMON file/stream name
01496 //
01497 //  Argument: detector   detector type (near,far,caldet)
01498 //            sec        seconds of timestamp
01499 //            nanosec    nanoseconds
01500 //
01501 //  Return:   the base file/stream name
01502 //
01503 //  Contact:  R. Hatcher
01504 //   
01505 
01506    // Detector::Detector_t det = (Detector::Detector_t)(detector);
01507    // Char_t detchar = Detector::AsString(det)[0];
01508    VldTimeStamp starttime(sec,nanosec);
01509    Int_t date = starttime.GetDate();
01510    if (date>19700000) date %= 1000000;
01511    Int_t time = starttime.GetTime();
01512 
01513    string fname = Form("B%6.6d_%6.6d",date,time);
01514    return fname;
01515 }
01516 
01517 //_____________________________________________________________________________
01518 Int_t RotoServer::ProcessBuffer() 
01519 {
01520 //
01521 //  Purpose:  Process the RawRecord that is in the buffer
01522 //            Build it into objects, write it out
01523 //
01524 //  Argument: (none)
01525 //
01526 //  Return:   if 0 then conversion and write were successful
01527 //
01528 //  Contact:  R. Hatcher
01529 //   
01530    static Int_t nrec = 0;
01531 
01532    logDebug(5,"process buffer %d",nrec);
01533    // MSG("Roto", Msg::kDebug) << "RotoServer::ProcessBuffer " << nrec << endl;
01534 
01535    if (fFlatBinaryOutputFile) {
01536      Int_t recLength = fRecvBufferUsed + sizeof(recLength);
01537      fwrite(&recLength,sizeof(recLength),1,fFlatBinaryOutputFile);
01538      fwrite(fRecvBuffer.GetArray(),fRecvBufferUsed,1,fFlatBinaryOutputFile);
01539    }
01540 
01541    Int_t status = kSuccess;
01542    Int_t obj_status = 0;
01543 
01544    // turn buffer into record(s)
01545    MomNavigator* mom = 
01546       RotoObjectifier::BufferInflate(fRecvBuffer.GetArray(),
01547                                      fRecvBufferUsed,obj_status);
01548 
01549    if (!mom) {
01550       logError("buffer inflate returned no MOM, REC %d ERR %d",
01551                nrec,obj_status);
01552       return kFailInflateBuff;
01553    }
01554 
01555    if (4 == obj_status || 8 == obj_status) {
01556 
01557       logError("partial buffer failure rec %d err %d",nrec,obj_status);
01558 
01559       status |= kFailPartialRec;
01560       fNackMessage += "Portion of record could not be converted to RawDataBlocks;";
01561    }      
01562                      
01563    // write records out
01564 
01565    const RawRecord* rawrec = 0;
01566    TIter reciter = mom->FragmentIter();
01567    while ( ( rawrec = dynamic_cast<const RawRecord*>(reciter()) ) ) {
01568      status |= WriteRawRecord(rawrec);
01569      nrec++;
01570    }
01571 
01572    delete mom; // clean up
01573 
01574    return status;
01575 
01576 }
01577 
01578 //_____________________________________________________________________________
01579 Int_t RotoServer::WriteRawRecord(const RawRecord* rawrec) 
01580 {
01581 //
01582 //  Purpose:  Write the inflated RawRecord
01583 //            Build it into objects, write it out
01584 //
01585 //  Argument: (none)
01586 //
01587 //  Return:   if 0 then conversion and write were successful
01588 //
01589 //  Contact:  R. Hatcher
01590 //   
01591 
01592    Int_t status = kSuccess;
01593 
01594    string rotoStrmName = ChooseStreamName(rawrec);
01595 
01596    if (!fOutputStreamManager) {
01597       logError("process buffer had no output stream manager");
01598       status |= kFailWriteRec;
01599       return status;
01600    }
01601 
01602    PerOutputStream* stream = 0;
01603    stream = dynamic_cast<PerOutputStream*>
01604       (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01605 
01606    if (!stream) {
01607       // no open stream with the right name ...
01608       // openfile wasn't called ?? or perhaps the data is
01609       // corrupted.  we want to protect this data so
01610       // put it in a file of bogus data
01611       logError("process buffer had no stream '%s'",
01612                rotoStrmName.c_str());
01613       string streamType = // last part of rotoStrmName after final .
01614          rotoStrmName.substr(rotoStrmName.find_last_of(".")+1,string::npos); 
01615       rotoStrmName =  "bogus." + streamType;
01616       stream = dynamic_cast<PerOutputStream*>
01617          (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01618       if (!stream) {
01619          // perhaps the bogus file isn't open ...
01620          EFileType ftype = kBogusFile;
01621          string fname = BogusFileName();
01622          Int_t err = OpenFile(fname.c_str(),ftype);
01623          if (err) {
01624             logError("process buffer could not open 'bogus' file '%s'",
01625                      fname.c_str());
01626          }
01627          // last try at getting the stream
01628          stream = dynamic_cast<PerOutputStream*>
01629             (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01630          
01631       }
01632    }
01633    
01634    if (stream) {
01635 //         MSG("Roto", Msg::kVerbose) 
01636 //            << "RotoServer::ProcessBuffer put on stream " 
01637 //            << strmName << endl;
01638          // remove const for PerOutputStream::SetObject
01639       RawRecord *rawrec_nonconst =
01640          const_cast<RawRecord *>(rawrec);
01641       stream->SetObject(rawrec_nonconst);
01642       stream->Store();
01643    } 
01644    else {
01645       logError("process buffer still had no stream '%s'",
01646                rotoStrmName.c_str());
01647       //MSG("Roto", Msg::kWarning) 
01648       //   << "RotoServer::ProcessBuffer no stream " << rotoStrmName;
01649       status |= kFailWriteRec;
01650    }
01651 
01652    return status;
01653 }
01654 
01655 //_____________________________________________________________________________
01656 string RotoServer::ChooseStreamName(const RawRecord* rawrec) 
01657 {
01658 //
01659 //  Purpose:  Pick which stream to put this record into
01660 //
01661 //  Argument: (none)
01662 //
01663 //  Return:   name of stream
01664 //
01665 //  Contact:  R. Hatcher
01666 //   
01667    string rotoStrmName;
01668 
01669    VldContext vldc = rawrec->GetRawHeader()->GetVldContext(); // always okay
01670    
01671    const RawDaqHeader *rawdaqhead = 
01672       dynamic_cast<const RawDaqHeader *>(rawrec->GetRawHeader());
01673    
01674    const RawBeamMonHeader *rawbeammonhead =
01675       dynamic_cast<const RawBeamMonHeader *>(rawrec->GetRawHeader());
01676 
01677    if (rawdaqhead) {
01678       // this came from the DAQ
01679       Int_t run    = rawdaqhead->GetRun();
01680       Int_t subrun = rawdaqhead->GetSubRun();
01681       
01682       Per::EStreamType stype = Per::kDaqMonitor;
01683       
01684       TIter blkiter =
01685          (const_cast<RawRecord*>(rawrec))->GetRawBlockIter();
01686       TObject*            tobj = 0;
01687       const RawDataBlock* rb   = 0;
01688 
01689       Bool_t sawLISummary = false;
01690       Bool_t sawSnarlHeader = false;
01691       Bool_t sawDigitBlock = false;
01692 
01693       while ( (tobj = blkiter()) ) {
01694          rb = dynamic_cast<RawDataBlock*>(tobj);
01695 
01696          if ((dynamic_cast<const RawLIAdcSummaryBlock*>(rb))) 
01697             sawLISummary = true;
01698 
01699          if ((dynamic_cast<const RawLITimingSummaryBlock*>(rb)))
01700             sawLISummary = true;
01701 
01702          if ((dynamic_cast<const RawSnarlHeaderBlock*>(rb)))
01703             sawSnarlHeader = true;
01704 
01705          if ((dynamic_cast<const RawDigitDataBlock*>(rb)))
01706             sawDigitBlock = true;
01707       }
01708       // decide on which stream
01709       // if has LI blocks then --> LightInjection
01710       // else if had Snarl then --> DaqSnarl
01711       // otherwise --> DaqMonitor
01712       
01713       if (sawLISummary) {
01714          stype = Per::kLightInjection;
01715       }
01716       else if (sawSnarlHeader || sawDigitBlock ) {
01717          stype = Per::kDaqSnarl;
01718          if (!sawSnarlHeader || !sawDigitBlock ) {
01719             logWarn("XOR RawSnarlHeaderBlock %d RawDgitiDataBlock %d",
01720                    sawSnarlHeader,sawDigitBlock);
01721             
01722             //MSG("Roto", Msg::kWarning) 
01723             //   << "RotoServer::ProcessBuffer " 
01724             //   << "  RawSnarlHeaderBlock " << snarl_hdr 
01725             //   << "  RawDigitDataBlock " << digits
01726             //   << endl;
01727          }
01728       }
01729       
01730       // if we're in "one file" mode then the stream naming
01731       // doesn't depend on run/subrun
01732       if (fFakeDAQFileName != "") {
01733          rotoStrmName = fFakeDAQFileName;
01734       } else {
01735          rotoStrmName = BuildDaqBaseName(vldc.GetDetector(),run,subrun);
01736       }
01737       string streamType = Per::AsString(stype);
01738       rotoStrmName +=  "." + streamType;
01739 
01740    }
01741    else if (rawbeammonhead) {
01742       // this came from the BEAMMON
01743      Per::EStreamType stype = Per::kBeamMon;
01744 
01745      string streamType = Per::AsString(stype);
01746      rotoStrmName = "beam." + streamType;
01747 
01748    }
01749    else {
01750      // assume it came from the DCS
01751      Per::EStreamType stype = Per::kDcsMonitor;
01752      
01753      TIter blkiter =
01754        (const_cast<RawRecord*>(rawrec))->GetRawBlockIter();
01755      TObject*            tobj = 0;
01756      const RawDataBlock* rb   = 0;
01757      
01758      Bool_t sawAlarm = false;
01759      
01760      while ( (tobj = blkiter()) ) {
01761        rb = dynamic_cast<RawDataBlock*>(tobj);
01762        
01763        if ((dynamic_cast<const RawDcsAlarmBlock*>(rb))) 
01764          sawAlarm = true;
01765        
01766      }
01767      if (sawAlarm) stype = Per::kDcsAlarm;
01768      
01769      string streamType = Per::AsString(stype);
01770      rotoStrmName = "dcs." + streamType;
01771 
01772    }
01773 
01774    return rotoStrmName;
01775 }
01776 
01777 //_____________________________________________________________________________
01778 Int_t RotoServer::ProcessConfig() 
01779 {
01780 //
01781 //  Purpose:  Process the config that is in the buffer
01782 //            
01783 //  Argument: (none)
01784 //
01785 //  Return:   if 0 then config processing was uneventful
01786 //
01787 //  Contact:  R. Hatcher
01788 //   
01789 
01790    char config[1024];
01791    char stream[1024];
01792    int  nItems = 0;
01793    bool ok = true;
01794 
01795    //   nItems = sscanf(fRecvBuffer.GetArray(),
01796    //                   "config autosave: %s = %d rec %d sec",
01797    //                   stream,&nrec,&nsec);
01798 
01799    nItems = sscanf(fRecvBuffer.GetArray(),
01800                    "config %s:",config);
01801 
01802    if (!strncmp(config,"autosave",8)) {
01803      unsigned int nrec, nsec;
01804      nItems = sscanf(fRecvBuffer.GetArray(),
01805                      "config autosave: %s = %u rec %u sec",
01806                      stream,&nrec,&nsec);
01807 
01808      if (nItems != 3) ok = false;
01809      else {
01810        int nmod = SetAutoSaveConfig(stream,nrec,nsec);
01811        if (nmod < 1) {
01812          logWarn("ProcessConfig stream \"%s\" unknown\n",stream);
01813        }
01814      }
01815 
01816    }
01817    else if (!strncmp(config,"compress",8)) {
01818      int level;
01819      nItems = sscanf(fRecvBuffer.GetArray(),
01820                      "config compress: %s = %d",
01821                      stream,&level);
01822 
01823      if (nItems != 2) ok = false;
01824      else {
01825        int nmod = SetCompressConfig(stream,level);
01826        if (nmod < 1) {
01827          logWarn("ProcessConfig stream \"%s\" unknown\n",stream);
01828        }
01829      }
01830    }
01831    else if (!strncmp(config,"basketsize",10)) {
01832      int basketsize;
01833      nItems = sscanf(fRecvBuffer.GetArray(),
01834                      "config basketsize: %s = %d",
01835                      stream,&basketsize);
01836 
01837      if (nItems != 2) ok = false;
01838      else {
01839        int nmod = SetBasketSizeConfig(stream,basketsize);
01840        if (nmod < 1) {
01841          logWarn("ProcessConfig stream \"%s\" unknown\n",stream);
01842        }
01843      }
01844 
01845    }
01846    else ok = false;
01847 
01848    if (!ok) 
01849      logWarn("ProcessConfig failed to interpret string: \"%s\"\n",
01850              fRecvBuffer.GetArray());
01851 
01852    return 0;
01853 }
01854 
01855 //_____________________________________________________________________________
01856 void RotoServer::BuildStateReport() 
01857 {
01858 //
01859 //  Purpose:  Update state report string
01860 //
01861 //  Argument: (none)
01862 //
01863 //  Return:   (none) [fStateReport is modified]
01864 //
01865 //  Contact:  R. Hatcher
01866 //   
01867    fStateReport  = RotoRcCmd::ElementAsStlString(MINOS_ROOTER_ROOTER);
01868    fStateReport += "; ";
01869    fStateReport += 
01870       RotoRcCmd::InstrAsStlString(MINOS_ROOTER_STATE_REPORT,fCurrentState);
01871    fStateReport += "; ";
01872 }
01873 
01874 //_____________________________________________________________________________

Generated on Thu Feb 5 23:43:33 2009 for loon by doxygen 1.3.5