Main Page   Namespace List   Class Hierarchy   Alphabetical List   Compound List   File List   Namespace Members   Compound Members   File Members  

RotoServer.cxx

Go to the documentation of this file.
00001 
00002 // $Id: RotoServer.cxx,v 1.23 2002/08/29 18:51:10 rhatcher Exp $
00003 //
00004 // Communications and Control of Roto-rooter for receiving raw blocks,
00005 // turning them into objects and writing them to files
00006 //
00007 // rhatcher@fnal.gov
00009 
00010 #include "Rotorooter/RotoServer.h"
00011 #include "Rotorooter/RotoSocket.h"
00012 #include "Rotorooter/RotoObjectifier.h"
00013 
00014 #include <syslog.h>
00015 
00016 #include "TSystem.h"
00017 #include "TServerSocket.h"
00018 
00019 // for function: char* Form(const char *fmt, ...)
00020 #include "TString.h"
00021 
00022 #include "RawData/RawRecord.h"
00023 #include "RawData/RawDaqHeader.h"
00024 
00025 #include "RawData/RawLIAdcSummaryBlock.h"
00026 #include "RawData/RawLITimingSummaryBlock.h"
00027 #include "RawData/RawSnarlHeaderBlock.h"
00028 #include "RawData/RawDigitDataBlock.h"
00029 
00030 #include "Persistency/PerOutputStream.h"
00031 #include "Persistency/PerOutputStreamManager.h"
00032 
00033 const UInt_t dbg_ReportCmd    = 0x0001;
00034 const UInt_t dbg_ReportReply  = 0x0002;
00035 const UInt_t dbg_ReportHex    = 0x0004;
00036 const UInt_t dbg_ReportNotTo  = 0x0008;
00037 
00038 UInt_t RotoServer::fgDebugFlags = 0;
00039 //  dbg_ReportCmd | dbg_ReportReply | dbg_ReportNotTo;
00040 
00041 static string reportOpt = 
00042   (RotoServer::GetDebugFlags() & dbg_ReportHex) ? "b" : " ";
00043 
00044 #include "MinosObjectMap/MomNavigator.h"
00045 #include "MessageService/MsgService.h"
00046 
00047 ClassImp(RotoServer)
00048 CVSID("$Id: RotoServer.cxx,v 1.23 2002/08/29 18:51:10 rhatcher Exp $");
00049 
00050 enum RotoFailureMode {
00051    kSuccess         = 0x00000000,
00052    kFailRecvCmd     = 0x00000001,
00053    kFailRecvByteCnt = 0x00000002,
00054    kFailRecvData    = 0x00000004,
00055    kFailBuffResize  = 0x00000008,
00056    kFailNotToMe     = 0x00000010,
00057    kFailCmdBadFrom  = 0x00000020,
00058    kFailCmdUnknown  = 0x00000040,
00059    kFailReqUnknown  = 0x00000080,
00060    kFailBadRcType   = 0x00000100,
00061    kFailOpenFile    = 0x00001000,
00062    kFailCloseFile   = 0x00010000,
00063    kFailRecBuffer   = 0x00100000,
00064    kFailInflateBuff = 0x00200000,
00065    kFailPartialRec  = 0x00400000,
00066    kFailWriteRec    = 0x01000000,
00067 
00068    kFailNoCodeYet   = 0x40000000,
00069    kFailWhoops      = 0x80000000
00070 };
00071 
00072 //_____________________________________________________________________________
00073 RotoServer::RotoServer(Int_t port, Int_t nwords, Bool_t allow_overwrite, Int_t tcp_nodelay_flag)
00074 : fPort(port), fTCP_NODELAY_flag(tcp_nodelay_flag)
00075 {
00076 //
00077 //  Purpose:  Create a RotoServer
00078 //            Open the server socket looking for connections on the port
00079 //            Allocate buffer to receive data
00080 //
00081 //  Argument: port     TCP/IP port #
00082 //            nwords   number of words for initial data recv buffer
00083 //            allow_overwrite   all server to overwrite output files
00084 //            tcp_nodelay_flag  set socket to use TCP_NODELAY
00085 //
00086 //  Return:   (none)
00087 //
00088 //  Contact:  R. Hatcher
00089 //
00090 
00091    Init(nwords, allow_overwrite);
00092 
00093    fServerSocket = new TServerSocket(port, kTRUE);
00094    fServerSocket->SetOption(kNoDelay,fTCP_NODELAY_flag);
00095 }
00096 
00097 //_____________________________________________________________________________
00098 RotoServer::RotoServer(const char* fileName, Int_t nwords, Bool_t allow_overwrite)
00099 {
00100 //
00101 //  Purpose:  Create a RotoServer
00102 //            Open the RotoSocket (file emulation of a TSocket)
00103 //            Allocate buffer to receive data
00104 //
00105 //  Argument: fileName File name for RotoSocket.
00106 //            nwords   number of words for initial data recv buffer
00107 //            allow_overwrite kTRUE if allowed to overwite output file.
00108 //
00109 //  Return:   (none)
00110 //
00111 //  Contact:  R. Hatcher
00112 //
00113 
00114    Init(nwords, allow_overwrite);
00115 
00116    fSocket = new RotoSocket(fileName);
00117 
00118 }
00119 //_____________________________________________________________________________
00120 RotoServer::~RotoServer() 
00121 {
00122 //
00123 //  Purpose:  Destroy a RotoServer
00124 //            Close the server socket looking for connections on the port
00125 //            Close any open sockets
00126 //            Deallocate buffer to receive data
00127 //
00128 //  Argument: (none)
00129 //
00130 //  Return:   (none)
00131 //
00132 //  Contact:  R. Hatcher
00133 //
00134 
00135    fCurrentState = ROOTER_STATE_STOPPING;
00136 
00137    if (fServerSocket) {
00138       fServerSocket->Close();
00139       delete fServerSocket;
00140       fServerSocket = 0;
00141    }
00142 
00143    CloseAndDeleteSocket(fSocket);
00144 
00145    if (fOutputStreamManager) {
00146       // make sure *everything* has been flushed
00147       fOutputStreamManager->Write();
00148       fOutputStreamManager->CloseStream();
00149       //CloseStream() makes this redundant: fOutputStreamManager->CloseFile();
00150      
00151       delete fOutputStreamManager;
00152       fOutputStreamManager = 0;
00153    }   
00154 
00155    fCurrentState = ROOTER_STATE_UNKNOWN;
00156 
00157    syslog(LOG_INFO,"~RotoServer");
00158 }
00159 
00160 //_____________________________________________________________________________
00161 void RotoServer::CloseAndDeleteSocket(TSocket*& socket)
00162 {
00163 //
00164 //  Purpose:  Close and delete socket; zero pointer
00165 //
00166 //  Argument: socket   reference to pointer to socket      
00167 //
00168 //  Return:   (none)
00169 //
00170 //  Contact:  R. Hatcher
00171 //
00172    if (socket) {
00173       if (socket->IsValid()) socket->Close();
00174       delete socket;
00175       socket = 0;
00176    }
00177 }
00178 
00179 //_____________________________________________________________________________
00180 void RotoServer::Init(Int_t nwords, Bool_t allow_overwrite)
00181 {
00182 //
00183 //  Purpose:  Common initialisation of a RotoServer
00184 //            Allocate buffer to receive data
00185 //
00186 //  Argument: nwords   number of words for initial data recv buffer
00187 //            whoami   MinosOnlineEntity (ROTO_DCP or ROTO_DCS)
00188 //
00189 //  Return:   (none)
00190 //
00191 //  Contact:  R. Hatcher
00192 //
00193 
00194    fServerSocket         = 0;
00195    fSocket               = 0;
00196    fCurrentState         = ROOTER_STATE_UNKNOWN;
00197    fNackMessage          = ""; 
00198    fStateReport          = "";
00199    fAccessMode           = allow_overwrite ? Per::kRecreate : Per::kNew;
00200    fOutputStreamManager  = 0;
00201    fFakeDAQFileName      = "";
00202 
00203    ResizeBuffer(fRecvBuffer,nwords*sizeof(Int_t)/sizeof(Char_t),"Recv");
00204    ResizeBuffer(fReplyBuffer,4096*sizeof(Char_t),"Reply");
00205 
00206    fOutputStreamManager = new PerOutputStreamManager();
00207    fCurrentState = ROOTER_STATE_UNCONNECTED;
00208    RotoObjectifier::SysLogRawBlockRegistry();
00209 
00210    syslog(LOG_INFO,"Default RotoServer configuration\n");
00211    // Set default AutoSave parameters
00212    // all streams: 1000 records or 10 seconds
00213    SetAutoSaveConfig("*",1000,10);
00214    // Set default Compression to use file defaults
00215    SetCompressConfig("*",-1);
00216    // Set default BasketSize to a uniform 64000
00217    SetBasketSizeConfig("*",64000);
00218 
00219 }
00220 
00221 //_____________________________________________________________________________
00222 Int_t RotoServer::SetAutoSaveConfig(const string& stream, UInt_t nrec, UInt_t nsec) 
00223 {
00224 //
00225 //  Purpose:  Set AutoSave configuration values in <map>s
00226 //
00227 //  Argument: stream    - name of stream ("*" = all)
00228 //            nrec      - record frequency
00229 //            nsec      - max time duration
00230 //
00231 //  Return:   number of stream configs modified
00232 //
00233 //  Contact:  R. Hatcher
00234 //
00235 
00236    int nmod = 0;
00237 
00238    int ibeg = Per::kDaqSnarl;
00239    int iend = Per::kDcsMonitor;
00240    for (int i = ibeg; i<=iend; i++ ) {
00241       Per::EStreamType stype = (Per::EStreamType) i;
00242       string treeName = Per::AsString(stype);
00243       if (treeName == stream || stream == "*") {
00244          syslog(LOG_INFO,
00245                 "AutoSave is %d records or %d seconds for %s stream\n",
00246                 nrec,nsec,treeName.c_str());
00247          fAutoSaveIntMap[stype]  = nrec;
00248          fAutoSaveTimeMap[stype] = nsec;
00249          nmod++;
00250       }
00251    }
00252 
00253    return nmod;
00254 }
00255 
00256 //_____________________________________________________________________________
00257 Int_t RotoServer::SetCompressConfig(const string& stream, Int_t level)
00258 {
00259 //
00260 //  Purpose:  Set Compression configuration values in <map>s
00261 //
00262 //  Argument: stream    - name of stream ("*" = all)
00263 //            level     - compression level
00264 //                          -1 : use file default
00265 //                           0 : no compression
00266 //                           1 : default ROOT compression
00267 //                        ...9 : maximum compression
00268 //
00269 //  Return:   number of stream configs modified
00270 //
00271 //  Contact:  R. Hatcher
00272 //
00273 
00274    int nmod = 0;
00275 
00276    int ibeg = Per::kDaqSnarl;
00277    int iend = Per::kDcsMonitor;
00278    for (int i = ibeg; i<=iend; i++ ) {
00279       Per::EStreamType stype = (Per::EStreamType) i;
00280       string treeName = Per::AsString(stype);
00281       if (treeName == stream || stream == "*") {
00282          syslog(LOG_INFO,
00283                 "Compression level is %d for %s stream\n",
00284                 level,treeName.c_str());
00285          fCompressionMap[stype]  = level;
00286          nmod++;
00287       }
00288    }
00289 
00290    return nmod;
00291 }
00292 
00293 //_____________________________________________________________________________
00294 Int_t RotoServer::SetBasketSizeConfig(const string& stream, UInt_t basketsize)
00295 {
00296 //
00297 //  Purpose:  Set BasketSize configuration values in <map>s
00298 //
00299 //  Argument: stream     - name of stream ("*" = all)
00300 //            basketsize - size of basket
00301 //
00302 //  Return:   number of stream configs modified
00303 //
00304 //  Contact:  R. Hatcher
00305 //
00306 
00307    int nmod = 0;
00308 
00309    int ibeg = Per::kDaqSnarl;
00310    int iend = Per::kDcsMonitor;
00311    for (int i = ibeg; i<=iend; i++ ) {
00312       Per::EStreamType stype = (Per::EStreamType) i;
00313       string treeName = Per::AsString(stype);
00314       if (treeName == stream || stream == "*") {
00315          syslog(LOG_INFO,
00316                 "BasketSize is %d for %s stream\n",
00317                 basketsize,treeName.c_str());
00318          fBasketSizeMap[stype]  = basketsize;
00319          nmod++;
00320       }
00321    }
00322 
00323    return nmod;
00324 }
00325 
00326 //_____________________________________________________________________________
00327 void RotoServer::Run() 
00328 {
00329 //
00330 //  Purpose:  Main loop
00331 //            Test validity of server socket, wait for connection attempt
00332 //            accept commands on open socket, process and reply
00333 //
00334 //  Argument: (none)
00335 //
00336 //  Return:   (none)
00337 //
00338 //  Contact:  R. Hatcher
00339 //
00340 
00341    //
00342    // is this server socket okay? If missing the socket must be valid
00343    // RotoSocket.
00344    if (!fServerSocket ) { 
00345      if ( ! fSocket->IsValid() ) {
00346         syslog(LOG_ALERT," Unable to open file as RotoSocket");
00347         return;
00348      }
00349    }
00350    else if (!fServerSocket->IsValid()) {
00351       string why;
00352       switch (fServerSocket->GetErrorCode()) {
00353       case  0: why = "no error, socket should be valid"; break;
00354       case -1: why = "low level socket() call failed";   break;
00355       case -2: why = "low level bind() call failed - ";
00356                why += "port may already be in use";
00357                break;
00358       case -3: why = "low level listen() call failed";   break;
00359       default: why = "unknown error";                    break;
00360       }
00361       // MSG("Roto",Msg::kFatal) 
00362       //   << endl
00363       //   << "  server socket failed on port "
00364       //   << fServerSocket->GetLocalPort()
00365       //   << " was invalid \n  "
00366       //   << why
00367       //   << endl;
00368       syslog(LOG_ALERT," server socket on port %d failed, why=%s",
00369              fServerSocket->GetLocalPort(),why.c_str());
00370       return;
00371    }
00372 
00373    fCurrentState = ROOTER_STATE_UNCONNECTED;
00374    Int_t ncmd = 0;
00375 
00376    RotoRcCmd cmd;
00377 
00378    //
00379    // main loop, while shutdown hasn't been requested
00380    //
00381    while (fCurrentState != ROOTER_STATE_SHUTDOWN_REQ) {
00382 
00383       // Accept a connection and return a full-duplex communication socket.
00384       if (!fSocket) {
00385          fSocket = fServerSocket->Accept();
00386          fSocket->SetOption(kNoDelay,fTCP_NODELAY_flag);
00387          //fSocket->SetOption(kKeepAlive,1);
00388          if (fCurrentState == ROOTER_STATE_UNCONNECTED)
00389              fCurrentState = ROOTER_STATE_CONNECTED;
00390       }
00391 
00392       Int_t status = kSuccess;
00393       fNackMessage = "";
00394 
00395       // accept commands on open socket
00396       
00397       status |= RecvCommand(fSocket,cmd);
00398       // failure to get command+data correctly is indicative
00399       // of a broken socket
00400       if (status & (kFailRecvCmd|kFailRecvByteCnt|kFailRecvData) ) {
00401          syslog(LOG_CRIT,"socket from '%s' abruptly broken",
00402                 fSocket->GetInetAddress().GetHostName());
00403 
00404          // if (fgDebugFlags & dbg_ReportCmd) 
00405          //    MSG("Roto", Msg::kInfo) 
00406          //      << "RotoServer::Run socket appears broken"  << endl;
00407 
00408          // should here cleanly close all files associated with
00409          // this socket, for now (since we're single socketed)
00410          // just close *everything*
00411          fOutputStreamManager->Write();
00412          fOutputStreamManager->CloseStream();
00413          //CloseStream() makes this redundant: fOutputStreamManager->CloseFile();
00414          
00415          CloseAndDeleteSocket(fSocket);
00416          fCurrentState = ROOTER_STATE_UNCONNECTED;
00417          continue;
00418       }
00419       ncmd++;
00420 
00421       if (fgDebugFlags & dbg_ReportCmd) 
00422          syslog(LOG_DEBUG,"msg %d status %d",ncmd,status);
00423 
00424       //if (fgDebugFlags & dbg_ReportCmd) 
00425       //   MSG("Roto", Msg::kInfo) 
00426       //      << "RotoServer::Run msg#" 
00427       //      << ncmd << " status=" << status
00428       //      << " " << cmd.AsStlString(reportOpt)
00429       //      << endl;
00430 
00431       status |= CheckTo(cmd);
00432       if (status & kFailNotToMe) continue;
00433 
00434       status |= ProcessCommand(fSocket,cmd);
00435 
00436       ReplyToCommand(fSocket,cmd,status);
00437 
00438       if (fCurrentState == ROOTER_STATE_DISCONNECTING) {
00439          CloseAndDeleteSocket(fSocket);
00440          fCurrentState = ROOTER_STATE_UNCONNECTED;
00441       }
00442    }
00443 }
00444 
00445 //_____________________________________________________________________________
00446 void RotoServer::ResizeBuffer(TArray& buffer, Int_t size, const Char_t* name) 
00447 {
00448 //
00449 //  Purpose:  Attempt to resize receive/reply buffer size
00450 //
00451 //  Argument: buffer   TArray to be resized
00452 //            size     desire size in nominal units
00453 //            name     name for log messages
00454 //
00455 //  Return:   (none)
00456 //
00457 //  Contact:  R. Hatcher
00458 //
00459 
00460    int oldsize = buffer.GetSize();
00461    buffer.Set(0);  // toss old array so we don't copy contents
00462 
00463    Int_t request = size;
00464    Int_t failures = 0;
00465    while ( buffer.GetSize() != request ) {
00466       buffer.Set(request);
00467       if (buffer.GetSize() == request) break; // success!
00468       failures++;
00469       request -= 1024*sizeof(Int_t);  // failed, try smaller size
00470    }
00471 
00472    if (failures) {
00473          syslog(LOG_ERR,"resize %s buffer from %d to %d only got %d",
00474                 name,oldsize,size,request);
00475    } else {
00476          syslog(LOG_NOTICE,"resize %s buffer from %d to %d",
00477                 name,oldsize,size);
00478    }
00479 }
00480 
00481 //_____________________________________________________________________________
00482 Int_t RotoServer::RecvCommand(TSocket* socket, RotoRcCmd& command)
00483 {
00484 //
00485 //  Purpose:  Receive RunControl-protocol-like commands from socket
00486 //
00487 //  Argument: socket   ptr to socket on which command is to be read
00488 //            command  RunControl encoded command
00489 //
00490 //  Return:   kSuccess (0)     = success
00491 //            kFailRecvCmd     = error retrieving 4-byte command
00492 //            kFailRecvByteCnt = error retrieving followup byte count
00493 //            kFailRecvData    = error retrieving followup data
00494 //            kFailBuffResize  = could not resize buffer to hold entire record
00495 //                               RecvBuffer holds only partial record
00496 //
00497 //  Contact:  R. Hatcher
00498 //
00499    Int_t  lCommand, ltouse, lBuffer;
00500 
00501    lCommand = fRecvBufferUsed = ltouse = lBuffer = 0;
00502 
00503    //
00504    // Retrieve "command" (4bytes of packed info)
00505    //
00506    lCommand = socket->RecvRaw(&command.fEncoded,sizeof(Int_t));
00507    if (lCommand != sizeof(Int_t)) {
00508       syslog(LOG_ERR,"RecvRaw command got %d expect %d",
00509              lCommand,sizeof(Int_t));
00510       return kFailRecvCmd;
00511    }
00512    //
00513    // if DATATOFOLLOW bit is set get the size and the data
00514    //
00515    if (command.HasDataToFollow()) {
00516       // byte count includes its own 4 bytes
00517       Int_t extra_bytes; 
00518       ltouse = socket->RecvRaw(&extra_bytes,sizeof(Int_t));
00519       fRecvBufferUsed = extra_bytes - 4;
00520       if (ltouse != sizeof(Int_t)) {
00521       syslog(LOG_ERR,"RecvRaw nbytes got %d expect %d",
00522              ltouse,sizeof(Int_t));
00523          return kFailRecvByteCnt;
00524       }
00525       if (fRecvBufferUsed>0) {
00526          // there be data in dat' d'er socket
00527 
00528          // if the buffer isn't big enough resize it (up only)
00529          if (fRecvBufferUsed>fRecvBuffer.GetSize()) 
00530             ResizeBuffer(fRecvBuffer,fRecvBufferUsed,"Recv");
00531 
00532          Bool_t partial = false;
00533          if (fRecvBufferUsed>fRecvBuffer.GetSize()) {
00534             // couldn't resize buffer large enough
00535             fNackMessage += 
00536                Form("Buffer could only hold %d of %d;",
00537                     fRecvBuffer.GetSize(),fRecvBufferUsed);
00538             partial = true;
00539             fRecvBufferUsed = fRecvBuffer.GetSize();
00540          }
00541 
00542          lBuffer = socket->RecvRaw(fRecvBuffer.GetArray(),fRecvBufferUsed);
00543          if (lBuffer != fRecvBufferUsed) {
00544             syslog(LOG_ERR,"RecvRaw buffer got %d expect %d",
00545                    lBuffer,fRecvBufferUsed);
00546             fNackMessage += 
00547                Form("Buffer expected %d, saw %d;",
00548                     fRecvBufferUsed,lBuffer);
00549             return kFailRecvData;
00550          }
00551          if (partial) return kFailBuffResize;
00552 
00553       }
00554    }
00555 
00556    return kSuccess;
00557 }
00558    
00559 //_____________________________________________________________________________
00560 Int_t RotoServer::CheckTo(RotoRcCmd& command) 
00561 {
00562 //
00563 //  Purpose:  Check if this command was directed to the right place
00564 //
00565 //  Argument: command   Run Control command
00566 //
00567 //  Return:   kSuccess      if okay
00568 //            kFailNotToMe  if not intended for RotoRooter
00569 //
00570 //  Contact:  R. Hatcher
00571 //
00572 
00573    int to = command.GetTo();
00574    if (to == MINOS_ROOTER) return kSuccess;
00575 
00576    syslog(LOG_ERR,"I am 0x%2.2x, got message for 0x%2.2x (cmd 0x%8.8x)",
00577           MINOS_ROOTER,to,command.fEncoded);
00578 
00579    // if (fgDebugFlags & dbg_ReportNotTo) 
00580    //   MSG("Roto", Msg::kInfo) 
00581    //      << "RotoServer::Run received message meant for 0x" 
00582    //      << hex << to << dec
00583    //      << ",  I am 0x" 
00584    //      << hex << MINOS_ROOTER << dec 
00585    //      << endl;
00586 
00587    return kFailNotToMe;
00588 }
00589 //_____________________________________________________________________________
00590 Int_t RotoServer::ProcessCommand(TSocket* socket, RotoRcCmd command)
00591 {
00592 //
00593 //  Purpose:  Process the command received on socket
00594 //
00595 //  Argument: socket      ptr to socket on which command is to be read
00596 //            command     RunControl encoded command
00597 //
00598 //  Return:   kSuccess (0)  if successful (yields ACK)
00599 //            various       if there was a problem (yields UNABLE_TO)
00600 //
00601 //  Contact:  R. Hatcher
00602 //
00603    int from  = command.GetFrom();
00604    int type  = command.GetType();
00605    int instr = command.GetInstr();
00606 
00607    switch (type) {
00608    case MINOS_ROOTER_COMMAND:
00609       if (MINOS_DCP != from && MINOS_DCS != from) {
00610          syslog(LOG_ERR,"MinosEntity 0x%2.2x on %s attempted COMMAND",
00611                 from,socket->GetInetAddress().GetHostName());
00612 
00613          fNackMessage += 
00614             Form("Not from DCP or DCS but %s (0x%x);",
00615                  RotoRcCmd::ElementAsStlString(from).c_str(),from);
00616          return kFailCmdBadFrom;
00617          break;
00618       }         
00619       switch (instr) {
00620       case ROOTER_OPENSOCKET: 
00621          return kSuccess;
00622          break;
00623       case ROOTER_CLOSESOCKET: 
00624          // we can't do this immediately here because we need to reply first
00625          fCurrentState = ROOTER_STATE_DISCONNECTING;
00626          return kSuccess;
00627          break;
00628       case ROOTER_OPENFILE: 
00629          return OpenFile();
00630          break;
00631       case ROOTER_CLOSEFILE: 
00632          return CloseFile();
00633          break;
00634       case ROOTER_RECBUFFER: 
00635          return ProcessBuffer();
00636          break;
00637       case ROOTER_SHUTDOWN: 
00638          syslog(LOG_INFO,"MinosEntity 0x%2.2x on %s requested shutdown",
00639                 from,socket->GetInetAddress().GetHostName());
00640          // MSG("Roto",Msg::kInfo)
00641          //    << " Roto SHUTDOWN command " << endl;
00642          fCurrentState = ROOTER_STATE_SHUTDOWN_REQ;
00643          return kSuccess;
00644          break;
00645       case ROOTER_CONFIG:
00646          return ProcessConfig();
00647          break;
00648       default:
00649          syslog(LOG_ERR,"MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00650                 from,socket->GetInetAddress().GetHostName(),
00651                 command.fEncoded);
00652 
00653          fNackMessage += 
00654             Form("Unmodelled %s %s;",
00655                  RotoRcCmd::TypeAsStlString(type).c_str(),
00656                  RotoRcCmd::InstrAsStlString(type,instr).c_str());
00657          return kFailCmdUnknown;
00658          break;
00659       }
00660       break;
00661    case MINOS_ROOTER_REQUEST:
00662       switch (instr) {
00663       case ROOTER_CURRENT_STATE: 
00664          return kSuccess;
00665          break;
00666       case ROOTER_STATUS_REPORT: 
00667          BuildStateReport();
00668          return kSuccess;
00669          break;
00670       default:
00671          syslog(LOG_ERR,"MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00672                 from,socket->GetInetAddress().GetHostName(),
00673                 command.fEncoded);
00674 
00675          fNackMessage += 
00676             Form("Unmodelled %s %s;",
00677                  RotoRcCmd::TypeAsStlString(type).c_str(),
00678                  RotoRcCmd::InstrAsStlString(type,instr).c_str());
00679          return kFailReqUnknown;
00680          break;
00681       }
00682       break;
00683    default:
00684          syslog(LOG_ERR,"MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00685                 from,socket->GetInetAddress().GetHostName(),
00686                 command.fEncoded);
00687 
00688          fNackMessage += 
00689             Form("Unmodelled %s %s;",
00690                  RotoRcCmd::TypeAsStlString(type).c_str(),
00691                  RotoRcCmd::InstrAsStlString(type,instr).c_str());
00692       return kFailBadRcType;
00693       break;
00694    }
00695    return kFailWhoops;
00696 }
00697 
00698 //_____________________________________________________________________________
00699 void RotoServer::ReplyToCommand(TSocket* socket, RotoRcCmd command,
00700                                 Int_t status)
00701 {
00702 //
00703 //  Purpose:  Reply to the command received on socket
00704 //            Special commands send more than just ack/nack
00705 //
00706 //  Argument: socket   ptr to socket on which command is to be read
00707 //            command  RunControl encoded command
00708 //            ack      whether to send ACKNOWLEDGE or UNABLE_TO
00709 //
00710 //  Return:   (none)
00711 //
00712 //  Contact:  R. Hatcher
00713 //
00714    static Int_t nreply = 0;
00715    nreply++;
00716 
00717    Int_t type  = command.GetType();
00718    Int_t instr = command.GetInstr();
00719    Bool_t ack  = ( kSuccess == status );
00720 
00721    RotoRcCmd reply = command;
00722    reply.SwapFromTo();
00723 
00724    switch (type) {
00725    case MINOS_ROOTER_COMMAND: {
00726       //
00727       // send ACK/UNABLE_TO for COMMAND
00728       //
00729 
00730       // construct the entirety of what we're sending first
00731       // rather than sending the individual bits
00732       // combine ACK with STATE_REPORT into single socket->SendRaw
00733 
00734       Int_t  ackfield   = 
00735          (ack) ? MINOS_ROOTER_ACKNOWLEDGE : MINOS_ROOTER_UNABLE_TO;
00736       Bool_t nackmsg    = ( ! ack && fNackMessage.length()>0);
00737 
00738       int totmsgsize = sizeof(Int_t);
00739       if (nackmsg) totmsgsize += sizeof(Int_t) + fNackMessage.length();
00740       totmsgsize += sizeof(Int_t);
00741       if (totmsgsize>fReplyBuffer.GetSize())
00742          ResizeBuffer(fReplyBuffer,totmsgsize,"Reply");
00743       Char_t* p = fReplyBuffer.GetArray();
00744 
00745       reply.SetType(ackfield);
00746       reply.SetDataToFollow(nackmsg);
00747 //      socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00748       memcpy(p,&reply.fEncoded,sizeof(Int_t));
00749       p += sizeof(Int_t);
00750 
00751       if (!ack) {
00752          syslog(LOG_ERR,"sent NACK 0x%8.8x to MinosEntity 0x%2.2x on %s",
00753                 reply.fEncoded,reply.GetTo(),
00754                 socket->GetInetAddress().GetHostName());
00755          syslog(LOG_ERR,"%s",reply.AsStlString().c_str());
00756          if (fNackMessage.length()>0) 
00757             syslog(LOG_ERR,"  %s",fNackMessage.c_str());
00758       }
00759 #ifdef VERBOSE_SYSLOG
00760       else {
00761 //rwh: very verbose
00762          syslog(LOG_ERR,"ACK to %s",command.AsStlString().c_str());
00763       }
00764 #endif
00765 
00766       if (nackmsg) {
00767          // byte count includes its own 4 bytes
00768          Int_t nackmsg_length = fNackMessage.length() + 4;
00769 //         socket->SendRaw(&nackmsg_length,sizeof(Int_t));
00770 //         socket->SendRaw(fNackMessage.c_str(),nackmsg_length-4);
00771          memcpy(p,&nackmsg_length,sizeof(Int_t));
00772          p += sizeof(Int_t);
00773          memcpy(p,fNackMessage.c_str(),nackmsg_length-4);
00774          p += nackmsg_length-4;
00775       }
00776       //
00777       // protocol says COMMAND also gets a STATE_REPORT reply
00778       //
00779       reply.SetType(MINOS_ROOTER_STATE_REPORT);
00780       reply.SetInstr(fCurrentState);
00781       reply.SetDataToFollow(false);
00782 //      socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00783       memcpy(p,&reply.fEncoded,sizeof(Int_t));
00784       p += sizeof(Int_t);
00785       socket->SendRaw(fReplyBuffer.GetArray(),totmsgsize);                 
00786         
00787       //if (fgDebugFlags & dbg_ReportReply) {
00788       //   string status_msg = "";
00789       //   if (status != 0) status_msg += Form("\n  status 0x%x",status);
00790       //   MSG("Roto", Msg::kInfo) 
00791       //      << "RotoServer::ReplyToCommand reply#" 
00792       //      << nreply 
00793       //      << " " << reply.AsStlString(reportOpt)
00794       //      << status_msg
00795       //      << endl;
00796 
00797       break;
00798    }
00799    case MINOS_ROOTER_REQUEST:
00800       switch (instr) {
00801       case ROOTER_CURRENT_STATE:
00802          reply.SetType(MINOS_ROOTER_STATE_REPORT);
00803          reply.SetInstr(fCurrentState);
00804          reply.SetDataToFollow(false);
00805          socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00806          break;
00807       case ROOTER_STATUS_REPORT: {
00808          reply.SetType(MINOS_ROOTER_REPORT);
00809          reply.SetInstr(ROOTER_STATUS_REPORT);
00810          reply.SetDataToFollow(true);
00811          socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00812          // byte count includes its own 4 bytes
00813          Int_t report_length = fStateReport.length() + 4;
00814          socket->SendRaw(&report_length,sizeof(Int_t));
00815          socket->SendRaw(fStateReport.c_str(),report_length-4);
00816 
00817          //if (fgDebugFlags & dbg_ReportReply) 
00818          //   MSG("Roto", Msg::kInfo) 
00819          //      << "RotoServer::ReplyToCommand reply#" 
00820          //      << nreply 
00821          //      << " " << reply.AsStlString(reportOpt)
00822          //      << endl
00823          //      << "  \"" << fStateReport << "\" (" << report_length << ")"
00824          //      << endl;
00825          break;
00826       }
00827 
00828       default:
00829          break;
00830       }
00831       break;
00832    default:
00833       break;
00834    }      
00835           
00836 }
00837 
00838 //_____________________________________________________________________________
00839 Int_t RotoServer::OpenFile()
00840 {
00841 //
00842 //  Purpose:  Open a file, use buffer to determine parameters
00843 //
00844 //  Argument: (none)
00845 //
00846 //  Return:   if 0 then file creation was okay
00847 //
00848 //  Contact:  R. Hatcher
00849 //   
00850    int    nwords = fRecvBufferUsed/sizeof(Int_t);
00851    if (nwords<1) {
00852       syslog(LOG_ERR,"OpenFile() too few words (%d)",nwords);
00853       return kFailOpenFile;
00854    }
00855    Int_t* ibuffer = (Int_t*) fRecvBuffer.GetArray();
00856    Int_t from = ibuffer[0];
00857 
00858    if (fFakeDAQFileName != "") {
00859       fNackMessage += "OpenFile not possible, already in FakeDaqFile mode ";
00860       fNackMessage += ";";
00861       syslog(LOG_NOTICE,"OpenFile() already in fake mode");
00862       return kFailCmdBadFrom;
00863    }
00864 
00865    switch (from) {
00866    case MINOS_DCP:
00867       if (nwords<4) {
00868          syslog(LOG_ERR,"OpenFile() from DCP too few words (%d)",nwords);
00869          return kFailOpenFile;
00870       }
00871       return OpenDaqFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00872       break;
00873    case MINOS_DCS:
00874       if (nwords<4) {
00875          syslog(LOG_ERR,"OpenFile() from DCS too few words (%d)",nwords);
00876          return kFailOpenFile;
00877       }
00878       return OpenDcsFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00879       break;
00880    default: {
00881       // a fake "from" in the buffer is a hack to support
00882       // binary files without proper header blocks
00883       // buffer should be 4bytes of "fake from" + the name
00884       EFileType ftype = kDaqFile;
00885       const char* fname = fRecvBuffer.GetArray()+sizeof(Int_t);
00886       Int_t err = OpenFile(fname,ftype);
00887       if (!err) fFakeDAQFileName = fname;
00888       return err;
00889       break;
00890       }
00891    }
00892    fNackMessage += "OpenFile not possible from ";
00893    fNackMessage += RotoRcCmd::ElementAsStlString(from);
00894    fNackMessage += ";";
00895    return kFailCmdBadFrom;
00896 }
00897 
00898 //_____________________________________________________________________________
00899 
00900 Int_t RotoServer::CloseFile()
00901 {
00902 //
00903 //  Purpose:  Close a file, use buffer to determine parameters
00904 //
00905 //  Argument: (none)
00906 //
00907 //  Return:   if 0 then file closure was okay
00908 //
00909 //  Contact:  R. Hatcher
00910 //   
00911    int    nwords = fRecvBufferUsed/sizeof(Int_t);
00912    if (nwords<1) {
00913       syslog(LOG_ERR,"CloseFile() too few words (%d)",nwords);
00914       return kFailOpenFile;
00915    }
00916    Int_t* ibuffer = (Int_t*) fRecvBuffer.GetArray();
00917    Int_t from = ibuffer[0];
00918 
00919    switch (from) {
00920    case MINOS_DCP:
00921       if (fFakeDAQFileName != "") {
00922          fNackMessage += "CloseFile not possible, in FakeDaqFile mode ";
00923          fNackMessage += ";";
00924          return kFailCmdBadFrom;
00925       }
00926       if (nwords<4) {
00927          syslog(LOG_ERR,"CloseFile() from DCP too few words (%d)",nwords);
00928          return kFailOpenFile;
00929       }
00930       return CloseDaqFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00931       break;
00932    case MINOS_DCS:
00933       if (fFakeDAQFileName != "") {
00934          fNackMessage += "CloseFile not possible, in FakeDaqFile mode ";
00935          fNackMessage += ";";
00936          return kFailCmdBadFrom;
00937       }
00938       if (nwords<4) {
00939          syslog(LOG_ERR,"CloseFile() from DCS too few words (%d)",nwords);
00940          return kFailOpenFile;
00941       }
00942       return CloseDcsFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00943       break;
00944    default: {
00945       // a fake "from" in the buffer is a hack to support
00946       // binary files without proper header blocks
00947       // buffer should be 4bytes of "fake from" + the name
00948       EFileType ftype = kDaqFile;
00949       const char* fname = fRecvBuffer.GetArray()+sizeof(Int_t);
00950       Int_t err = CloseFile(fname,ftype);
00951       if (!err) fFakeDAQFileName = "";
00952       return err;
00953       break;
00954       }
00955    }
00956    fNackMessage += "CloseFile not possible from ";
00957    fNackMessage += RotoRcCmd::ElementAsStlString(from);
00958    fNackMessage += ";";
00959    return kFailCmdBadFrom;
00960 }
00961 
00962 //_____________________________________________________________________________
00963 string RotoServer::FileExtName(EFileType ftype)
00964 {
00965 //
00966 //  Purpose:  return standardized file extension names
00967 //
00968 //  Argument: ftype      enum of file types
00969 //
00970 //  Return:   string version of standard name
00971 //
00972 //  Contact:  R. Hatcher
00973 //   
00974    switch (ftype) {
00975    case kDaqFile:        return string("mdaq.root");    break;
00976    case kDcsFile:        return string("mdcs.root");    break;
00977    case kBogusFile:      return string("mall.root");  break;
00978    default:
00979       syslog(LOG_ERR,"bad FileExtName request %d",(int)ftype);
00980       //MSG("Roto", Msg::kInfo) 
00981       //   << "RotoServer::StreamName illegal enum value: " 
00982       //   << (int)stype << endl;      
00983       return string("generic.root");
00984       break;
00985    }
00986 
00987 }
00988 
00989 //_____________________________________________________________________________
00990 string RotoServer::BogusFileName()
00991 {
00992 //
00993 //  Purpose:  return bogus file name
00994 //
00995 //  Argument: (none)
00996 //
00997 //  Return:   string of a file name in which to put records
00998 //            who's real destination couldn't be determined
00999 //            tag it with the machine, port, and current date/time
01000 //
01001 //  Contact:  R. Hatcher
01002 //   
01003    Int_t date = fStartTime.GetDate();
01004    if (date>19700000) date %= 1000000;
01005    Int_t time = fStartTime.GetTime();
01006    string fname = Form("bogus_%s_p%d_%6.6d_%6.6d",
01007                        gSystem->HostName(),fPort,date,time);
01008    // squeeze out any blanks with "_"
01009    string::size_type where;
01010    while (string::npos != (where = fname.find(" "))) {
01011       fname.replace(where,1,"_");
01012    }
01013    return fname;
01014 }
01015 
01016 //_____________________________________________________________________________
01017 Int_t RotoServer::OpenDaqFile(Int_t detector, Int_t run, Int_t subrun)
01018 {
01019 //
01020 //  Purpose:  Open a DAQ file
01021 //
01022 //  Argument: detector   detector type (near,far,caldet)
01023 //            run        run # 
01024 //            subrun     subrun #
01025 //
01026 //  Return:   if 0 then file creation was okay
01027 //
01028 //  Contact:  R. Hatcher
01029 //   
01030 
01031    string filebase = BuildDaqBaseName(detector,run,subrun);
01032    //MSG("Roto", Msg::kDebug) << "RotoServer::OpenDaqFile " << filename << endl;
01033    return OpenFile(filebase.c_str(),kDaqFile);
01034 }
01035 
01036 //_____________________________________________________________________________
01037 Int_t RotoServer::OpenFile(const Char_t* fnamebase, EFileType ftype)
01038 {
01039 //
01040 //  Purpose:  Open a DAQ/DCS file
01041 //
01042 //  Argument: fnamebase  file name without extension
01043 //            ftype     kDaqFile, kDcsFile, or kBogusFile
01044 //
01045 //  Return:   if 0 then file creation was okay
01046 //
01047 //  Contact:  R. Hatcher
01048 // 
01049 
01050    const char* pbasedir = gSystem->Getenv("DAQ_DATA_DIR");
01051    if (!pbasedir) pbasedir = ".";
01052    string filename = string(pbasedir)  + "/" + 
01053                      string(fnamebase) + "." + 
01054                      FileExtName(ftype);
01055    syslog(LOG_INFO,"OpenFile '%s'",filename.c_str());
01056 
01057    bool openok = true;
01058 
01059    // create possible streams, attach each to the file
01060    // !!!!!! needs more protection against failures
01061 
01062    int ibeg = Per::kDaqSnarl;
01063    int iend = Per::kDcsMonitor;
01064    if (kDaqFile == ftype) {
01065       ibeg = Per::kDaqSnarl;
01066       iend = Per::kLightInjection;
01067    }
01068    else
01069    if (kDcsFile == ftype) {
01070       ibeg = Per::kDcsAlarm;
01071       iend = Per::kDcsMonitor;
01072    }
01073 
01074    string fbase = fnamebase;
01075    // if the file is a bogus one, then the stream names
01076    // only have "bogus" not the full file name
01077    if (string::npos != fbase.find("bogus")) fbase = "bogus";
01078    int fversion = 0;
01079 
01080    // if running from RotoSocket (ie. connected directly to input
01081    // binary file) then don't bother with AutoSave   
01082    bool fromFile = fSocket->InheritsFrom("RotoSocket");
01083    if (fromFile) MSG("Roto",Msg::kInfo)
01084          << "Reading from RotoSocket - turn off autosave"
01085          << endl;
01086 
01087    for (int i = ibeg; i<=iend; i++ ) {
01088       Per::EStreamType stype = (Per::EStreamType) i;
01089       string treeName = Per::AsString(stype);
01090       string rotoStrmName = fbase + "." + treeName;
01091 
01092       Int_t  compressLevel = fCompressionMap[stype];
01093       UInt_t basketSize    = fBasketSizeMap[stype];
01094 
01095       PerOutputStream* astream =
01096          fOutputStreamManager->OpenStream(rotoStrmName,treeName,
01097                                           "RawRecord","","",Per::kRecSplit,
01098                                           basketSize,compressLevel);
01099       if (astream) {
01100          // possible tweaks
01101          //   autosave (interval: 0, setbasket = true)
01102          //   basketsaveint (interval: 0)
01103          if (fromFile) astream->SetAutoSave(0,0);
01104          else {
01105             UInt_t autoSaveInt  = fAutoSaveIntMap[stype];
01106             UInt_t autoSaveTime = fAutoSaveTimeMap[stype];
01107             astream->SetAutoSave(autoSaveInt,autoSaveTime);
01108          }
01109          bool openok_astream = 
01110             fOutputStreamManager->SetFile(rotoStrmName,filename,fAccessMode);
01111 
01112          // handle potential case where file already exists
01113          // and the rotorooter is in kNew not kRecreate mode
01114          Per::EErrorCode ecode = astream->GetErrorCode();
01115          if (Per::kErrFileExists == ecode) {
01116             string basefilename = filename;
01117             string::size_type where = filename.rfind(".m");
01118             if (string::npos == where) where = filename.rfind(".root");
01119             if (string::npos == where) where = filename.length();
01120             bool keep_trying = true;
01121             // start adding version numbers
01122             do {
01123                ++fversion;
01124                filename = basefilename;  // without any attempt at versioning
01125                filename.insert(where,Form(".%d",fversion));
01126                openok_astream = 
01127                   fOutputStreamManager->SetFile(rotoStrmName,filename,fAccessMode);
01128                if (Per::kErrSuccess != ecode) {
01129                   syslog(LOG_INFO,
01130                          "  open versioned file saw Per error: (%d) %s",
01131                          (int)ecode,Per::AsString(ecode));
01132                }
01133 
01134                ecode = astream->GetErrorCode();
01135                keep_trying = (fversion<100) &&
01136                   (Per::kErrFileExists == ecode);
01137             } while (keep_trying);
01138             // report if there was success
01139             if (Per::kErrSuccess == ecode) 
01140                syslog(LOG_INFO,"  open stream using file %s",filename.c_str());
01141 
01142          } // file exists error
01143 
01144          if (Per::kErrSuccess != ecode) {
01145             syslog(LOG_INFO,"  open stream saw Per error: (%d) %s",
01146                    (int)ecode,Per::AsString(ecode));
01147          }
01148 
01149          openok = openok && openok_astream;
01150          
01151          if (openok_astream) 
01152             syslog(LOG_INFO,"  open stream %s as %s",
01153                    treeName.c_str(),rotoStrmName.c_str());
01154 
01155          //MSG("Roto", Msg::kDebug) 
01156          //   << "RotoServer::OpenDaqFile stream " << strmName
01157          //   << " opened " << ( (openok_astream) ? "ok" : "failed" )
01158          //   << endl;
01159       }
01160       else {
01161          openok = false;
01162       }
01163    }
01164 
01165    return (openok) ? kSuccess : kFailWhoops;
01166 }
01167 
01168 //_____________________________________________________________________________
01169 Int_t RotoServer::CloseDaqFile(Int_t detector, Int_t run, Int_t subrun)
01170 {
01171 //
01172 //  Purpose:  Close a DAQ file
01173 //
01174 //  Argument: detector   detector type (near,far,caldet)
01175 //            run        run # 
01176 //            subrun     subrun #
01177 //
01178 //  Return:   if >0 then file closure was okay
01179 //
01180 //  Contact:  R. Hatcher
01181 //   
01182 
01183    string filebase = BuildDaqBaseName(detector,run,subrun);
01184    return CloseFile(filebase.c_str(),kDaqFile);
01185 }
01186 
01187 //_____________________________________________________________________________
01188 Int_t RotoServer::CloseFile(const Char_t* fnamebase, EFileType ftype)
01189 {
01190 //
01191 //  Purpose:  Close a DAQ/DCS file
01192 //
01193 //  Argument: fnamebase  file name without extension
01194 //            ftype     kDaqFile, kDcsFile, or kBogusFile
01195 //
01196 //  Return:   if >0 then file closure was okay
01197 //
01198 //  Contact:  R. Hatcher
01199 //   
01200    const char* pbasedir = gSystem->Getenv("DAQ_DATA_DIR");
01201    if (!pbasedir) pbasedir = ".";
01202    string filename = string(pbasedir)  + "/" + 
01203                      string(fnamebase) + "." + 
01204                      FileExtName(ftype);
01205    syslog(LOG_INFO,"CloseFile '%s'",filename.c_str());
01206 
01207    bool closeok = true;
01208 
01209    // close possible streams
01210    // !!!!!! needs more protection against failures
01211 
01212    int ibeg = Per::kDaqSnarl;
01213    int iend = Per::kDcsMonitor;
01214    if (kDaqFile == ftype) {
01215       ibeg = Per::kDaqSnarl;
01216       iend = Per::kLightInjection;
01217    }
01218    else
01219    if (kDcsFile == ftype) {
01220       ibeg = Per::kDcsAlarm;
01221       iend = Per::kDcsMonitor;
01222    }
01223 
01224    string fbase = fnamebase;
01225    // if the file is a bogus one, then the stream names
01226    // only have "bogus" not the full file name
01227    if (string::npos != fbase.find("bogus")) fbase = "bogus";
01228 
01229    for (int i = ibeg; i<=iend; i++ ) {
01230       Per::EStreamType stype = (Per::EStreamType) i;
01231       string treeName = Per::AsString(stype);
01232       string rotoStrmName = fbase + "." + treeName;
01233 
01234       fOutputStreamManager->Write(rotoStrmName);
01235       fOutputStreamManager->CloseStream(rotoStrmName);
01236    }
01237    // CloseStream does this for us (besides this method takes a stream name)
01238    // fOutputStreamManager->CloseFile(filename);
01239                                                   
01240    return (closeok) ? kSuccess : kFailWhoops;
01241 }
01242 
01243 //_____________________________________________________________________________
01244 string RotoServer::BuildDaqBaseName(Int_t detector, Int_t run, Int_t subrun)
01245 {
01246 //
01247 //  Purpose:  Construct a standardized DAQ file/stream name base
01248 //
01249 //  Argument: detector   detector type (near,far,caldet)
01250 //            run        run # 
01251 //            subrun     subrun #
01252 //
01253 //  Return:   the base file/stream name
01254 //
01255 //  Contact:  R. Hatcher
01256 //   
01257 
01258    DetectorType::Detector_t det = (DetectorType::Detector_t)(detector);
01259    Char_t detchar = DetectorType::AsString(det)[0];
01260 
01261    string fname = Form("%c%8.8d_%4.4d",detchar,run,subrun);
01262    return fname;
01263 }
01264 
01265 //_____________________________________________________________________________
01266 Int_t RotoServer::OpenDcsFile(Int_t detector, Int_t sec, Int_t nanosec)
01267 {
01268 //
01269 //  Purpose:  Open the DCS file
01270 //
01271 //  Argument: detector   detector type (near,far,caldet)
01272 //            sec        seconds when file was started
01273 //            nanosec    nano when file was started
01274 //
01275 //  Return:   if 0 then file creation was okay
01276 //
01277 //  Contact:  R. Hatcher
01278 //   
01279 
01280    syslog(LOG_ERR,"OpenDcsFile %d %d %d not yet implemented",
01281           detector,sec,nanosec);
01282 
01283    fNackMessage += "OpenDcsFile not yet implemented;";
01284    return kFailNoCodeYet;
01285 }
01286 
01287 //_____________________________________________________________________________
01288 Int_t RotoServer::CloseDcsFile(Int_t detector, Int_t sec, Int_t nanosec)
01289 {
01290 //
01291 //  Purpose:  Close the DCS file
01292 //
01293 //  Argument: detector   detector type (near,far,caldet)
01294 //            sec        seconds when file was started
01295 //            nanosec    nano when file was started
01296 //
01297 //  Return:   if 0 then file closure as okay
01298 //
01299 //  Contact:  R. Hatcher
01300 //   
01301 
01302    syslog(LOG_ERR,"CloseDcsFile %d %d %d not yet implemented",
01303           detector,sec,nanosec);
01304 
01305    fNackMessage += "CloseDcsFile not yet implemented;";
01306    return kFailNoCodeYet;
01307 }
01308 
01309 //_____________________________________________________________________________
01310 string RotoServer::BuildDcsBaseName(Int_t detector, Int_t sec, Int_t nanosec)
01311 {
01312 //
01313 //  Purpose:  Construct a standardized DCS file/stream name
01314 //
01315 //  Argument: detector   detector type (near,far,caldet)
01316 //            sec        seconds of timestamp
01317 //            nanosec    nanoseconds
01318 //
01319 //  Return:   the base file/stream name
01320 //
01321 //  Contact:  R. Hatcher
01322 //   
01323 
01324    DetectorType::Detector_t det = (DetectorType::Detector_t)(detector);
01325    Char_t detchar = DetectorType::AsString(det)[0];
01326    VldTimeStamp time(sec,nanosec);
01327 
01328    string fname = Form("%c%s",detchar,time.AsString("s"));
01329    fname.replace(fname.find(" "),1,"_");
01330    return fname;
01331 }
01332 
01333 //_____________________________________________________________________________
01334 Int_t RotoServer::ProcessBuffer() 
01335 {
01336 //
01337 //  Purpose:  Process the RawRecord that is in the buffer
01338 //            Build it into objects, write it out
01339 //
01340 //  Argument: (none)
01341 //
01342 //  Return:   if 0 then conversion and write were successful
01343 //
01344 //  Contact:  R. Hatcher
01345 //   
01346    static Int_t nrec = 0;
01347    nrec++;
01348 
01349    // syslog(LOG_DEBUG,"process buffer %d",nrec);
01350    // MSG("Roto", Msg::kDebug) << "RotoServer::ProcessBuffer " << nrec << endl;
01351 
01352    Int_t status = kSuccess;
01353    Int_t obj_status = 0;
01354 
01355    // turn buffer into record(s)
01356    MomNavigator* mom = 
01357       RotoObjectifier::BufferInflate(fRecvBuffer.GetArray(),
01358                                      fRecvBufferUsed,obj_status);
01359 
01360    if (!mom) {
01361       syslog(LOG_ERR,"buffer inflate returned no MOM, rec %d err %d",
01362              nrec,obj_status);
01363       return kFailInflateBuff;
01364    }
01365 
01366    if (4 == obj_status || 8 == obj_status) {
01367 
01368       syslog(LOG_ERR,"partial buffer failure rec %d err %d",nrec,obj_status);
01369 
01370       status |= kFailPartialRec;
01371       fNackMessage += "Portion of record could not be converted to RawDataBlocks;";
01372    }      
01373                      
01374    // write records out
01375 
01376    const RawRecord *rawrec = 
01377       dynamic_cast<const RawRecord *>(mom->GetFragment("RawRecord"));
01378 
01379    string rotoStrmName = ChooseStreamName(rawrec);
01380 
01381    if (!fOutputStreamManager) {
01382       syslog(LOG_ERR,"process buffer had no output stream manager");
01383       status |= kFailWriteRec;
01384       return status;
01385    }
01386 
01387    PerOutputStream* stream = 0;
01388    stream = dynamic_cast<PerOutputStream*>
01389       (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01390 
01391    if (!stream) {
01392       // no open stream with the right name ...
01393       // openfile wasn't called ?? or perhaps the data is
01394       // corrupted.  we want to protect this data so
01395       // put it in a file of bogus data
01396       syslog(LOG_ERR,"process buffer had no stream '%s'",
01397              rotoStrmName.c_str());
01398       string streamType = // last part of rotoStrmName after final .
01399          rotoStrmName.substr(rotoStrmName.find_last_of(".")+1,string::npos); 
01400       rotoStrmName =  "bogus." + streamType;
01401       stream = dynamic_cast<PerOutputStream*>
01402          (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01403       if (!stream) {
01404          // perhaps the bogus file isn't open ...
01405          EFileType ftype = kBogusFile;
01406          string fname = BogusFileName();
01407          Int_t err = OpenFile(fname.c_str(),ftype);
01408          if (err) {
01409             syslog(LOG_ERR,"process buffer could not open 'bogus' file '%s'",
01410                    fname.c_str());
01411          }
01412          // last try at getting the stream
01413          stream = dynamic_cast<PerOutputStream*>
01414             (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01415          
01416       }
01417    }
01418    
01419    if (stream) {
01420 //         MSG("Roto", Msg::kVerbose) 
01421 //            << "RotoServer::ProcessBuffer put on stream " 
01422 //            << strmName << endl;
01423          // remove const for PerOutputStream::SetObject
01424       RawRecord *rawrec_nonconst =
01425          const_cast<RawRecord *>(rawrec);
01426       stream->SetObject(rawrec_nonconst);
01427       stream->Store();
01428    } 
01429    else {
01430       syslog(LOG_ERR,"process buffer still had no stream '%s'",
01431              rotoStrmName.c_str());
01432       //MSG("Roto", Msg::kWarning) 
01433       //   << "RotoServer::ProcessBuffer no stream " << rotoStrmName;
01434       status |= kFailWriteRec;
01435    }
01436 
01437    delete mom; // clean up
01438 
01439    return status;
01440 }
01441 
01442 //_____________________________________________________________________________
01443 string RotoServer::ChooseStreamName(const RawRecord* rawrec) 
01444 {
01445 //
01446 //  Purpose:  Pick which stream to put this record into
01447 //
01448 //  Argument: (none)
01449 //
01450 //  Return:   name of stream
01451 //
01452 //  Contact:  R. Hatcher
01453 //   
01454    string rotoStrmName;
01455 
01456    VldContext vldc = rawrec->GetRawHeader()->GetVldContext(); // always okay
01457    
01458    const RawDaqHeader *rawdaqhead = 
01459       dynamic_cast<const RawDaqHeader *>(rawrec->GetRawHeader());
01460    
01461    if (rawdaqhead) {
01462       // this came from the DAQ
01463       Int_t run    = rawdaqhead->GetRun();
01464       Int_t subrun = rawdaqhead->GetSubRun();
01465       
01466       Per::EStreamType stype = Per::kDaqMonitor;
01467       
01468       TIter blkiter =
01469          (const_cast<RawRecord*>(rawrec))->GetRawBlockIter();
01470       TObject*            tobj = 0;
01471       const RawDataBlock* rb   = 0;
01472 
01473       Bool_t sawLISummary = false;
01474       Bool_t sawSnarlHeader = false;
01475       Bool_t sawDigitBlock = false;
01476 
01477       while ( (tobj = blkiter()) ) {
01478          rb = dynamic_cast<RawDataBlock*>(tobj);
01479 
01480          if ((dynamic_cast<const RawLIAdcSummaryBlock*>(rb))) 
01481             sawLISummary = true;
01482 
01483          if ((dynamic_cast<const RawLITimingSummaryBlock*>(rb)))
01484             sawLISummary = true;
01485 
01486          if ((dynamic_cast<const RawSnarlHeaderBlock*>(rb)))
01487             sawSnarlHeader = true;
01488 
01489          if ((dynamic_cast<const RawDigitDataBlock*>(rb)))
01490             sawDigitBlock = true;
01491       }
01492       // decide on which stream
01493       // if has LI blocks then --> LightInjection
01494       // else if had Snarl then --> DaqSnarl
01495       // otherwise --> DaqMonitor
01496       
01497       if (sawLISummary) {
01498          stype = Per::kLightInjection;
01499       }
01500       else if (sawSnarlHeader || sawDigitBlock ) {
01501          stype = Per::kDaqSnarl;
01502          if (!sawSnarlHeader || !sawDigitBlock ) {
01503             syslog(LOG_ERR,"XOR RawSnarlHeaderBlock %d RawDgitiDataBlock %d",
01504                    sawSnarlHeader,sawDigitBlock);
01505             
01506             //MSG("Roto", Msg::kWarning) 
01507             //   << "RotoServer::ProcessBuffer " 
01508             //   << "  RawSnarlHeaderBlock " << snarl_hdr 
01509             //   << "  RawDigitDataBlock " << digits
01510             //   << endl;
01511          }
01512       }
01513       
01514       // if we're in "one file" mode then the stream naming
01515       // doesn't depend on run/subrun
01516       if (fFakeDAQFileName != "") {
01517          rotoStrmName = fFakeDAQFileName;
01518       } else {
01519          rotoStrmName = BuildDaqBaseName(vldc.GetDetector(),run,subrun);
01520       }
01521       string streamType = Per::AsString(stype);
01522       rotoStrmName +=  "." + streamType;
01523 
01524    }
01525    else {
01526       // assume it came from the DCS
01527       rotoStrmName = ".DcsMonitor";
01528       syslog(LOG_ALERT," can't handle DCS yet");
01529       assert(0);
01530 
01531    }
01532 
01533    return rotoStrmName;
01534 }
01535 
01536 //_____________________________________________________________________________
01537 Int_t RotoServer::ProcessConfig() 
01538 {
01539 //
01540 //  Purpose:  Process the config that is in the buffer
01541 //            
01542 //  Argument: (none)
01543 //
01544 //  Return:   if 0 then config processing was uneventful
01545 //
01546 //  Contact:  R. Hatcher
01547 //   
01548 
01549    char config[1024];
01550    char stream[1024];
01551    int  nItems = 0;
01552    bool ok = true;
01553 
01554    //   nItems = sscanf(fRecvBuffer.GetArray(),
01555    //                   "config autosave: %s = %d rec %d sec",
01556    //                   stream,&nrec,&nsec);
01557 
01558    nItems = sscanf(fRecvBuffer.GetArray(),
01559                    "config %s:",config);
01560 
01561    if (!strncmp(config,"autosave",8)) {
01562      unsigned int nrec, nsec;
01563      nItems = sscanf(fRecvBuffer.GetArray(),
01564                      "config autosave: %s = %u rec %u sec",
01565                      stream,&nrec,&nsec);
01566 
01567      if (nItems != 3) ok = false;
01568      else {
01569        int nmod = SetAutoSaveConfig(stream,nrec,nsec);
01570        if (nmod < 1) {
01571          syslog(LOG_WARNING,"ProcessConfig stream \"%s\" unknown\n",stream);
01572        }
01573      }
01574 
01575    }
01576    else if (!strncmp(config,"compress",8)) {
01577      int level;
01578      nItems = sscanf(fRecvBuffer.GetArray(),
01579                      "config compress: %s = %d",
01580                      stream,&level);
01581 
01582      if (nItems != 2) ok = false;
01583      else {
01584        int nmod = SetCompressConfig(stream,level);
01585        if (nmod < 1) {
01586          syslog(LOG_WARNING,"ProcessConfig stream \"%s\" unknown\n",stream);
01587        }
01588      }
01589    }
01590    else if (!strncmp(config,"basketsize",10)) {
01591      int basketsize;
01592      nItems = sscanf(fRecvBuffer.GetArray(),
01593                      "config basketsize: %s = %d",
01594                      stream,&basketsize);
01595 
01596      if (nItems != 2) ok = false;
01597      else {
01598        int nmod = SetBasketSizeConfig(stream,basketsize);
01599        if (nmod < 1) {
01600          syslog(LOG_WARNING,"ProcessConfig stream \"%s\" unknown\n",stream);
01601        }
01602      }
01603 
01604    }
01605    else ok = false;
01606 
01607    if (!ok) syslog(LOG_WARNING,
01608                    "ProcessConfig failed to interpret string: \"%s\"\n",
01609                    fRecvBuffer.GetArray());
01610      
01611 
01612    return 0;
01613 }
01614 
01615 //_____________________________________________________________________________
01616 void RotoServer::BuildStateReport() 
01617 {
01618 //
01619 //  Purpose:  Update state report string
01620 //
01621 //  Argument: (none)
01622 //
01623 //  Return:   (none) [fStateReport is modified]
01624 //
01625 //  Contact:  R. Hatcher
01626 //   
01627    fStateReport  = RotoRcCmd::ElementAsStlString(MINOS_ROOTER);
01628    fStateReport += "; ";
01629    fStateReport += 
01630       RotoRcCmd::InstrAsStlString(MINOS_ROOTER_STATE_REPORT,fCurrentState);
01631    fStateReport += "; ";
01632 }
01633 
01634 //_____________________________________________________________________________

Generated on Wed Sep 4 19:01:20 2002 for loon by doxygen1.2.16