00001
00002
00003
00004
00005
00006
00007
00009
00010 #include "Rotorooter/RotoServer.h"
00011 #include "Rotorooter/RotoSocket.h"
00012 #include "Rotorooter/RotoObjectifier.h"
00013
00014 #include <syslog.h>
00015
00016 #include "TSystem.h"
00017 #include "TServerSocket.h"
00018
00019
00020 #include "TString.h"
00021
00022 #include "RawData/RawRecord.h"
00023 #include "RawData/RawDaqHeader.h"
00024
00025 #include "RawData/RawLIAdcSummaryBlock.h"
00026 #include "RawData/RawLITimingSummaryBlock.h"
00027 #include "RawData/RawSnarlHeaderBlock.h"
00028 #include "RawData/RawDigitDataBlock.h"
00029
00030 #include "Persistency/PerOutputStream.h"
00031 #include "Persistency/PerOutputStreamManager.h"
00032
00033 const UInt_t dbg_ReportCmd = 0x0001;
00034 const UInt_t dbg_ReportReply = 0x0002;
00035 const UInt_t dbg_ReportHex = 0x0004;
00036 const UInt_t dbg_ReportNotTo = 0x0008;
00037
00038 UInt_t RotoServer::fgDebugFlags = 0;
00039
00040
00041 static string reportOpt =
00042 (RotoServer::GetDebugFlags() & dbg_ReportHex) ? "b" : " ";
00043
00044 #include "MinosObjectMap/MomNavigator.h"
00045 #include "MessageService/MsgService.h"
00046
00047 ClassImp(RotoServer)
00048 CVSID("$Id: RotoServer.cxx,v 1.23 2002/08/29 18:51:10 rhatcher Exp $");
00049
00050 enum RotoFailureMode {
00051 kSuccess = 0x00000000,
00052 kFailRecvCmd = 0x00000001,
00053 kFailRecvByteCnt = 0x00000002,
00054 kFailRecvData = 0x00000004,
00055 kFailBuffResize = 0x00000008,
00056 kFailNotToMe = 0x00000010,
00057 kFailCmdBadFrom = 0x00000020,
00058 kFailCmdUnknown = 0x00000040,
00059 kFailReqUnknown = 0x00000080,
00060 kFailBadRcType = 0x00000100,
00061 kFailOpenFile = 0x00001000,
00062 kFailCloseFile = 0x00010000,
00063 kFailRecBuffer = 0x00100000,
00064 kFailInflateBuff = 0x00200000,
00065 kFailPartialRec = 0x00400000,
00066 kFailWriteRec = 0x01000000,
00067
00068 kFailNoCodeYet = 0x40000000,
00069 kFailWhoops = 0x80000000
00070 };
00071
00072
00073 RotoServer::RotoServer(Int_t port, Int_t nwords, Bool_t allow_overwrite, Int_t tcp_nodelay_flag)
00074 : fPort(port), fTCP_NODELAY_flag(tcp_nodelay_flag)
00075 {
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091 Init(nwords, allow_overwrite);
00092
00093 fServerSocket = new TServerSocket(port, kTRUE);
00094 fServerSocket->SetOption(kNoDelay,fTCP_NODELAY_flag);
00095 }
00096
00097
00098 RotoServer::RotoServer(const char* fileName, Int_t nwords, Bool_t allow_overwrite)
00099 {
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114 Init(nwords, allow_overwrite);
00115
00116 fSocket = new RotoSocket(fileName);
00117
00118 }
00119
00120 RotoServer::~RotoServer()
00121 {
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135 fCurrentState = ROOTER_STATE_STOPPING;
00136
00137 if (fServerSocket) {
00138 fServerSocket->Close();
00139 delete fServerSocket;
00140 fServerSocket = 0;
00141 }
00142
00143 CloseAndDeleteSocket(fSocket);
00144
00145 if (fOutputStreamManager) {
00146
00147 fOutputStreamManager->Write();
00148 fOutputStreamManager->CloseStream();
00149
00150
00151 delete fOutputStreamManager;
00152 fOutputStreamManager = 0;
00153 }
00154
00155 fCurrentState = ROOTER_STATE_UNKNOWN;
00156
00157 syslog(LOG_INFO,"~RotoServer");
00158 }
00159
00160
00161 void RotoServer::CloseAndDeleteSocket(TSocket*& socket)
00162 {
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172 if (socket) {
00173 if (socket->IsValid()) socket->Close();
00174 delete socket;
00175 socket = 0;
00176 }
00177 }
00178
00179
00180 void RotoServer::Init(Int_t nwords, Bool_t allow_overwrite)
00181 {
00182
00183
00184
00185
00186
00187
00188
00189
00190
00191
00192
00193
00194 fServerSocket = 0;
00195 fSocket = 0;
00196 fCurrentState = ROOTER_STATE_UNKNOWN;
00197 fNackMessage = "";
00198 fStateReport = "";
00199 fAccessMode = allow_overwrite ? Per::kRecreate : Per::kNew;
00200 fOutputStreamManager = 0;
00201 fFakeDAQFileName = "";
00202
00203 ResizeBuffer(fRecvBuffer,nwords*sizeof(Int_t)/sizeof(Char_t),"Recv");
00204 ResizeBuffer(fReplyBuffer,4096*sizeof(Char_t),"Reply");
00205
00206 fOutputStreamManager = new PerOutputStreamManager();
00207 fCurrentState = ROOTER_STATE_UNCONNECTED;
00208 RotoObjectifier::SysLogRawBlockRegistry();
00209
00210 syslog(LOG_INFO,"Default RotoServer configuration\n");
00211
00212
00213 SetAutoSaveConfig("*",1000,10);
00214
00215 SetCompressConfig("*",-1);
00216
00217 SetBasketSizeConfig("*",64000);
00218
00219 }
00220
00221
00222 Int_t RotoServer::SetAutoSaveConfig(const string& stream, UInt_t nrec, UInt_t nsec)
00223 {
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235
00236 int nmod = 0;
00237
00238 int ibeg = Per::kDaqSnarl;
00239 int iend = Per::kDcsMonitor;
00240 for (int i = ibeg; i<=iend; i++ ) {
00241 Per::EStreamType stype = (Per::EStreamType) i;
00242 string treeName = Per::AsString(stype);
00243 if (treeName == stream || stream == "*") {
00244 syslog(LOG_INFO,
00245 "AutoSave is %d records or %d seconds for %s stream\n",
00246 nrec,nsec,treeName.c_str());
00247 fAutoSaveIntMap[stype] = nrec;
00248 fAutoSaveTimeMap[stype] = nsec;
00249 nmod++;
00250 }
00251 }
00252
00253 return nmod;
00254 }
00255
00256
00257 Int_t RotoServer::SetCompressConfig(const string& stream, Int_t level)
00258 {
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274 int nmod = 0;
00275
00276 int ibeg = Per::kDaqSnarl;
00277 int iend = Per::kDcsMonitor;
00278 for (int i = ibeg; i<=iend; i++ ) {
00279 Per::EStreamType stype = (Per::EStreamType) i;
00280 string treeName = Per::AsString(stype);
00281 if (treeName == stream || stream == "*") {
00282 syslog(LOG_INFO,
00283 "Compression level is %d for %s stream\n",
00284 level,treeName.c_str());
00285 fCompressionMap[stype] = level;
00286 nmod++;
00287 }
00288 }
00289
00290 return nmod;
00291 }
00292
00293
00294 Int_t RotoServer::SetBasketSizeConfig(const string& stream, UInt_t basketsize)
00295 {
00296
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307 int nmod = 0;
00308
00309 int ibeg = Per::kDaqSnarl;
00310 int iend = Per::kDcsMonitor;
00311 for (int i = ibeg; i<=iend; i++ ) {
00312 Per::EStreamType stype = (Per::EStreamType) i;
00313 string treeName = Per::AsString(stype);
00314 if (treeName == stream || stream == "*") {
00315 syslog(LOG_INFO,
00316 "BasketSize is %d for %s stream\n",
00317 basketsize,treeName.c_str());
00318 fBasketSizeMap[stype] = basketsize;
00319 nmod++;
00320 }
00321 }
00322
00323 return nmod;
00324 }
00325
00326
00327 void RotoServer::Run()
00328 {
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344 if (!fServerSocket ) {
00345 if ( ! fSocket->IsValid() ) {
00346 syslog(LOG_ALERT," Unable to open file as RotoSocket");
00347 return;
00348 }
00349 }
00350 else if (!fServerSocket->IsValid()) {
00351 string why;
00352 switch (fServerSocket->GetErrorCode()) {
00353 case 0: why = "no error, socket should be valid"; break;
00354 case -1: why = "low level socket() call failed"; break;
00355 case -2: why = "low level bind() call failed - ";
00356 why += "port may already be in use";
00357 break;
00358 case -3: why = "low level listen() call failed"; break;
00359 default: why = "unknown error"; break;
00360 }
00361
00362
00363
00364
00365
00366
00367
00368 syslog(LOG_ALERT," server socket on port %d failed, why=%s",
00369 fServerSocket->GetLocalPort(),why.c_str());
00370 return;
00371 }
00372
00373 fCurrentState = ROOTER_STATE_UNCONNECTED;
00374 Int_t ncmd = 0;
00375
00376 RotoRcCmd cmd;
00377
00378
00379
00380
00381 while (fCurrentState != ROOTER_STATE_SHUTDOWN_REQ) {
00382
00383
00384 if (!fSocket) {
00385 fSocket = fServerSocket->Accept();
00386 fSocket->SetOption(kNoDelay,fTCP_NODELAY_flag);
00387
00388 if (fCurrentState == ROOTER_STATE_UNCONNECTED)
00389 fCurrentState = ROOTER_STATE_CONNECTED;
00390 }
00391
00392 Int_t status = kSuccess;
00393 fNackMessage = "";
00394
00395
00396
00397 status |= RecvCommand(fSocket,cmd);
00398
00399
00400 if (status & (kFailRecvCmd|kFailRecvByteCnt|kFailRecvData) ) {
00401 syslog(LOG_CRIT,"socket from '%s' abruptly broken",
00402 fSocket->GetInetAddress().GetHostName());
00403
00404
00405
00406
00407
00408
00409
00410
00411 fOutputStreamManager->Write();
00412 fOutputStreamManager->CloseStream();
00413
00414
00415 CloseAndDeleteSocket(fSocket);
00416 fCurrentState = ROOTER_STATE_UNCONNECTED;
00417 continue;
00418 }
00419 ncmd++;
00420
00421 if (fgDebugFlags & dbg_ReportCmd)
00422 syslog(LOG_DEBUG,"msg %d status %d",ncmd,status);
00423
00424
00425
00426
00427
00428
00429
00430
00431 status |= CheckTo(cmd);
00432 if (status & kFailNotToMe) continue;
00433
00434 status |= ProcessCommand(fSocket,cmd);
00435
00436 ReplyToCommand(fSocket,cmd,status);
00437
00438 if (fCurrentState == ROOTER_STATE_DISCONNECTING) {
00439 CloseAndDeleteSocket(fSocket);
00440 fCurrentState = ROOTER_STATE_UNCONNECTED;
00441 }
00442 }
00443 }
00444
00445
00446 void RotoServer::ResizeBuffer(TArray& buffer, Int_t size, const Char_t* name)
00447 {
00448
00449
00450
00451
00452
00453
00454
00455
00456
00457
00458
00459
00460 int oldsize = buffer.GetSize();
00461 buffer.Set(0);
00462
00463 Int_t request = size;
00464 Int_t failures = 0;
00465 while ( buffer.GetSize() != request ) {
00466 buffer.Set(request);
00467 if (buffer.GetSize() == request) break;
00468 failures++;
00469 request -= 1024*sizeof(Int_t);
00470 }
00471
00472 if (failures) {
00473 syslog(LOG_ERR,"resize %s buffer from %d to %d only got %d",
00474 name,oldsize,size,request);
00475 } else {
00476 syslog(LOG_NOTICE,"resize %s buffer from %d to %d",
00477 name,oldsize,size);
00478 }
00479 }
00480
00481
00482 Int_t RotoServer::RecvCommand(TSocket* socket, RotoRcCmd& command)
00483 {
00484
00485
00486
00487
00488
00489
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499 Int_t lCommand, ltouse, lBuffer;
00500
00501 lCommand = fRecvBufferUsed = ltouse = lBuffer = 0;
00502
00503
00504
00505
00506 lCommand = socket->RecvRaw(&command.fEncoded,sizeof(Int_t));
00507 if (lCommand != sizeof(Int_t)) {
00508 syslog(LOG_ERR,"RecvRaw command got %d expect %d",
00509 lCommand,sizeof(Int_t));
00510 return kFailRecvCmd;
00511 }
00512
00513
00514
00515 if (command.HasDataToFollow()) {
00516
00517 Int_t extra_bytes;
00518 ltouse = socket->RecvRaw(&extra_bytes,sizeof(Int_t));
00519 fRecvBufferUsed = extra_bytes - 4;
00520 if (ltouse != sizeof(Int_t)) {
00521 syslog(LOG_ERR,"RecvRaw nbytes got %d expect %d",
00522 ltouse,sizeof(Int_t));
00523 return kFailRecvByteCnt;
00524 }
00525 if (fRecvBufferUsed>0) {
00526
00527
00528
00529 if (fRecvBufferUsed>fRecvBuffer.GetSize())
00530 ResizeBuffer(fRecvBuffer,fRecvBufferUsed,"Recv");
00531
00532 Bool_t partial = false;
00533 if (fRecvBufferUsed>fRecvBuffer.GetSize()) {
00534
00535 fNackMessage +=
00536 Form("Buffer could only hold %d of %d;",
00537 fRecvBuffer.GetSize(),fRecvBufferUsed);
00538 partial = true;
00539 fRecvBufferUsed = fRecvBuffer.GetSize();
00540 }
00541
00542 lBuffer = socket->RecvRaw(fRecvBuffer.GetArray(),fRecvBufferUsed);
00543 if (lBuffer != fRecvBufferUsed) {
00544 syslog(LOG_ERR,"RecvRaw buffer got %d expect %d",
00545 lBuffer,fRecvBufferUsed);
00546 fNackMessage +=
00547 Form("Buffer expected %d, saw %d;",
00548 fRecvBufferUsed,lBuffer);
00549 return kFailRecvData;
00550 }
00551 if (partial) return kFailBuffResize;
00552
00553 }
00554 }
00555
00556 return kSuccess;
00557 }
00558
00559
00560 Int_t RotoServer::CheckTo(RotoRcCmd& command)
00561 {
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573 int to = command.GetTo();
00574 if (to == MINOS_ROOTER) return kSuccess;
00575
00576 syslog(LOG_ERR,"I am 0x%2.2x, got message for 0x%2.2x (cmd 0x%8.8x)",
00577 MINOS_ROOTER,to,command.fEncoded);
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587 return kFailNotToMe;
00588 }
00589
00590 Int_t RotoServer::ProcessCommand(TSocket* socket, RotoRcCmd command)
00591 {
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603 int from = command.GetFrom();
00604 int type = command.GetType();
00605 int instr = command.GetInstr();
00606
00607 switch (type) {
00608 case MINOS_ROOTER_COMMAND:
00609 if (MINOS_DCP != from && MINOS_DCS != from) {
00610 syslog(LOG_ERR,"MinosEntity 0x%2.2x on %s attempted COMMAND",
00611 from,socket->GetInetAddress().GetHostName());
00612
00613 fNackMessage +=
00614 Form("Not from DCP or DCS but %s (0x%x);",
00615 RotoRcCmd::ElementAsStlString(from).c_str(),from);
00616 return kFailCmdBadFrom;
00617 break;
00618 }
00619 switch (instr) {
00620 case ROOTER_OPENSOCKET:
00621 return kSuccess;
00622 break;
00623 case ROOTER_CLOSESOCKET:
00624
00625 fCurrentState = ROOTER_STATE_DISCONNECTING;
00626 return kSuccess;
00627 break;
00628 case ROOTER_OPENFILE:
00629 return OpenFile();
00630 break;
00631 case ROOTER_CLOSEFILE:
00632 return CloseFile();
00633 break;
00634 case ROOTER_RECBUFFER:
00635 return ProcessBuffer();
00636 break;
00637 case ROOTER_SHUTDOWN:
00638 syslog(LOG_INFO,"MinosEntity 0x%2.2x on %s requested shutdown",
00639 from,socket->GetInetAddress().GetHostName());
00640
00641
00642 fCurrentState = ROOTER_STATE_SHUTDOWN_REQ;
00643 return kSuccess;
00644 break;
00645 case ROOTER_CONFIG:
00646 return ProcessConfig();
00647 break;
00648 default:
00649 syslog(LOG_ERR,"MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00650 from,socket->GetInetAddress().GetHostName(),
00651 command.fEncoded);
00652
00653 fNackMessage +=
00654 Form("Unmodelled %s %s;",
00655 RotoRcCmd::TypeAsStlString(type).c_str(),
00656 RotoRcCmd::InstrAsStlString(type,instr).c_str());
00657 return kFailCmdUnknown;
00658 break;
00659 }
00660 break;
00661 case MINOS_ROOTER_REQUEST:
00662 switch (instr) {
00663 case ROOTER_CURRENT_STATE:
00664 return kSuccess;
00665 break;
00666 case ROOTER_STATUS_REPORT:
00667 BuildStateReport();
00668 return kSuccess;
00669 break;
00670 default:
00671 syslog(LOG_ERR,"MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00672 from,socket->GetInetAddress().GetHostName(),
00673 command.fEncoded);
00674
00675 fNackMessage +=
00676 Form("Unmodelled %s %s;",
00677 RotoRcCmd::TypeAsStlString(type).c_str(),
00678 RotoRcCmd::InstrAsStlString(type,instr).c_str());
00679 return kFailReqUnknown;
00680 break;
00681 }
00682 break;
00683 default:
00684 syslog(LOG_ERR,"MinosEntity 0x%2.2x on %s bad cmd 0x%8.8x",
00685 from,socket->GetInetAddress().GetHostName(),
00686 command.fEncoded);
00687
00688 fNackMessage +=
00689 Form("Unmodelled %s %s;",
00690 RotoRcCmd::TypeAsStlString(type).c_str(),
00691 RotoRcCmd::InstrAsStlString(type,instr).c_str());
00692 return kFailBadRcType;
00693 break;
00694 }
00695 return kFailWhoops;
00696 }
00697
00698
00699 void RotoServer::ReplyToCommand(TSocket* socket, RotoRcCmd command,
00700 Int_t status)
00701 {
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714 static Int_t nreply = 0;
00715 nreply++;
00716
00717 Int_t type = command.GetType();
00718 Int_t instr = command.GetInstr();
00719 Bool_t ack = ( kSuccess == status );
00720
00721 RotoRcCmd reply = command;
00722 reply.SwapFromTo();
00723
00724 switch (type) {
00725 case MINOS_ROOTER_COMMAND: {
00726
00727
00728
00729
00730
00731
00732
00733
00734 Int_t ackfield =
00735 (ack) ? MINOS_ROOTER_ACKNOWLEDGE : MINOS_ROOTER_UNABLE_TO;
00736 Bool_t nackmsg = ( ! ack && fNackMessage.length()>0);
00737
00738 int totmsgsize = sizeof(Int_t);
00739 if (nackmsg) totmsgsize += sizeof(Int_t) + fNackMessage.length();
00740 totmsgsize += sizeof(Int_t);
00741 if (totmsgsize>fReplyBuffer.GetSize())
00742 ResizeBuffer(fReplyBuffer,totmsgsize,"Reply");
00743 Char_t* p = fReplyBuffer.GetArray();
00744
00745 reply.SetType(ackfield);
00746 reply.SetDataToFollow(nackmsg);
00747
00748 memcpy(p,&reply.fEncoded,sizeof(Int_t));
00749 p += sizeof(Int_t);
00750
00751 if (!ack) {
00752 syslog(LOG_ERR,"sent NACK 0x%8.8x to MinosEntity 0x%2.2x on %s",
00753 reply.fEncoded,reply.GetTo(),
00754 socket->GetInetAddress().GetHostName());
00755 syslog(LOG_ERR,"%s",reply.AsStlString().c_str());
00756 if (fNackMessage.length()>0)
00757 syslog(LOG_ERR," %s",fNackMessage.c_str());
00758 }
00759 #ifdef VERBOSE_SYSLOG
00760 else {
00761
00762 syslog(LOG_ERR,"ACK to %s",command.AsStlString().c_str());
00763 }
00764 #endif
00765
00766 if (nackmsg) {
00767
00768 Int_t nackmsg_length = fNackMessage.length() + 4;
00769
00770
00771 memcpy(p,&nackmsg_length,sizeof(Int_t));
00772 p += sizeof(Int_t);
00773 memcpy(p,fNackMessage.c_str(),nackmsg_length-4);
00774 p += nackmsg_length-4;
00775 }
00776
00777
00778
00779 reply.SetType(MINOS_ROOTER_STATE_REPORT);
00780 reply.SetInstr(fCurrentState);
00781 reply.SetDataToFollow(false);
00782
00783 memcpy(p,&reply.fEncoded,sizeof(Int_t));
00784 p += sizeof(Int_t);
00785 socket->SendRaw(fReplyBuffer.GetArray(),totmsgsize);
00786
00787
00788
00789
00790
00791
00792
00793
00794
00795
00796
00797 break;
00798 }
00799 case MINOS_ROOTER_REQUEST:
00800 switch (instr) {
00801 case ROOTER_CURRENT_STATE:
00802 reply.SetType(MINOS_ROOTER_STATE_REPORT);
00803 reply.SetInstr(fCurrentState);
00804 reply.SetDataToFollow(false);
00805 socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00806 break;
00807 case ROOTER_STATUS_REPORT: {
00808 reply.SetType(MINOS_ROOTER_REPORT);
00809 reply.SetInstr(ROOTER_STATUS_REPORT);
00810 reply.SetDataToFollow(true);
00811 socket->SendRaw(&reply.fEncoded,sizeof(Int_t));
00812
00813 Int_t report_length = fStateReport.length() + 4;
00814 socket->SendRaw(&report_length,sizeof(Int_t));
00815 socket->SendRaw(fStateReport.c_str(),report_length-4);
00816
00817
00818
00819
00820
00821
00822
00823
00824
00825 break;
00826 }
00827
00828 default:
00829 break;
00830 }
00831 break;
00832 default:
00833 break;
00834 }
00835
00836 }
00837
00838
00839 Int_t RotoServer::OpenFile()
00840 {
00841
00842
00843
00844
00845
00846
00847
00848
00849
00850 int nwords = fRecvBufferUsed/sizeof(Int_t);
00851 if (nwords<1) {
00852 syslog(LOG_ERR,"OpenFile() too few words (%d)",nwords);
00853 return kFailOpenFile;
00854 }
00855 Int_t* ibuffer = (Int_t*) fRecvBuffer.GetArray();
00856 Int_t from = ibuffer[0];
00857
00858 if (fFakeDAQFileName != "") {
00859 fNackMessage += "OpenFile not possible, already in FakeDaqFile mode ";
00860 fNackMessage += ";";
00861 syslog(LOG_NOTICE,"OpenFile() already in fake mode");
00862 return kFailCmdBadFrom;
00863 }
00864
00865 switch (from) {
00866 case MINOS_DCP:
00867 if (nwords<4) {
00868 syslog(LOG_ERR,"OpenFile() from DCP too few words (%d)",nwords);
00869 return kFailOpenFile;
00870 }
00871 return OpenDaqFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00872 break;
00873 case MINOS_DCS:
00874 if (nwords<4) {
00875 syslog(LOG_ERR,"OpenFile() from DCS too few words (%d)",nwords);
00876 return kFailOpenFile;
00877 }
00878 return OpenDcsFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00879 break;
00880 default: {
00881
00882
00883
00884 EFileType ftype = kDaqFile;
00885 const char* fname = fRecvBuffer.GetArray()+sizeof(Int_t);
00886 Int_t err = OpenFile(fname,ftype);
00887 if (!err) fFakeDAQFileName = fname;
00888 return err;
00889 break;
00890 }
00891 }
00892 fNackMessage += "OpenFile not possible from ";
00893 fNackMessage += RotoRcCmd::ElementAsStlString(from);
00894 fNackMessage += ";";
00895 return kFailCmdBadFrom;
00896 }
00897
00898
00899
00900 Int_t RotoServer::CloseFile()
00901 {
00902
00903
00904
00905
00906
00907
00908
00909
00910
00911 int nwords = fRecvBufferUsed/sizeof(Int_t);
00912 if (nwords<1) {
00913 syslog(LOG_ERR,"CloseFile() too few words (%d)",nwords);
00914 return kFailOpenFile;
00915 }
00916 Int_t* ibuffer = (Int_t*) fRecvBuffer.GetArray();
00917 Int_t from = ibuffer[0];
00918
00919 switch (from) {
00920 case MINOS_DCP:
00921 if (fFakeDAQFileName != "") {
00922 fNackMessage += "CloseFile not possible, in FakeDaqFile mode ";
00923 fNackMessage += ";";
00924 return kFailCmdBadFrom;
00925 }
00926 if (nwords<4) {
00927 syslog(LOG_ERR,"CloseFile() from DCP too few words (%d)",nwords);
00928 return kFailOpenFile;
00929 }
00930 return CloseDaqFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00931 break;
00932 case MINOS_DCS:
00933 if (fFakeDAQFileName != "") {
00934 fNackMessage += "CloseFile not possible, in FakeDaqFile mode ";
00935 fNackMessage += ";";
00936 return kFailCmdBadFrom;
00937 }
00938 if (nwords<4) {
00939 syslog(LOG_ERR,"CloseFile() from DCS too few words (%d)",nwords);
00940 return kFailOpenFile;
00941 }
00942 return CloseDcsFile(ibuffer[1],ibuffer[2],ibuffer[3]);
00943 break;
00944 default: {
00945
00946
00947
00948 EFileType ftype = kDaqFile;
00949 const char* fname = fRecvBuffer.GetArray()+sizeof(Int_t);
00950 Int_t err = CloseFile(fname,ftype);
00951 if (!err) fFakeDAQFileName = "";
00952 return err;
00953 break;
00954 }
00955 }
00956 fNackMessage += "CloseFile not possible from ";
00957 fNackMessage += RotoRcCmd::ElementAsStlString(from);
00958 fNackMessage += ";";
00959 return kFailCmdBadFrom;
00960 }
00961
00962
00963 string RotoServer::FileExtName(EFileType ftype)
00964 {
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974 switch (ftype) {
00975 case kDaqFile: return string("mdaq.root"); break;
00976 case kDcsFile: return string("mdcs.root"); break;
00977 case kBogusFile: return string("mall.root"); break;
00978 default:
00979 syslog(LOG_ERR,"bad FileExtName request %d",(int)ftype);
00980
00981
00982
00983 return string("generic.root");
00984 break;
00985 }
00986
00987 }
00988
00989
00990 string RotoServer::BogusFileName()
00991 {
00992
00993
00994
00995
00996
00997
00998
00999
01000
01001
01002
01003 Int_t date = fStartTime.GetDate();
01004 if (date>19700000) date %= 1000000;
01005 Int_t time = fStartTime.GetTime();
01006 string fname = Form("bogus_%s_p%d_%6.6d_%6.6d",
01007 gSystem->HostName(),fPort,date,time);
01008
01009 string::size_type where;
01010 while (string::npos != (where = fname.find(" "))) {
01011 fname.replace(where,1,"_");
01012 }
01013 return fname;
01014 }
01015
01016
01017 Int_t RotoServer::OpenDaqFile(Int_t detector, Int_t run, Int_t subrun)
01018 {
01019
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031 string filebase = BuildDaqBaseName(detector,run,subrun);
01032
01033 return OpenFile(filebase.c_str(),kDaqFile);
01034 }
01035
01036
01037 Int_t RotoServer::OpenFile(const Char_t* fnamebase, EFileType ftype)
01038 {
01039
01040
01041
01042
01043
01044
01045
01046
01047
01048
01049
01050 const char* pbasedir = gSystem->Getenv("DAQ_DATA_DIR");
01051 if (!pbasedir) pbasedir = ".";
01052 string filename = string(pbasedir) + "/" +
01053 string(fnamebase) + "." +
01054 FileExtName(ftype);
01055 syslog(LOG_INFO,"OpenFile '%s'",filename.c_str());
01056
01057 bool openok = true;
01058
01059
01060
01061
01062 int ibeg = Per::kDaqSnarl;
01063 int iend = Per::kDcsMonitor;
01064 if (kDaqFile == ftype) {
01065 ibeg = Per::kDaqSnarl;
01066 iend = Per::kLightInjection;
01067 }
01068 else
01069 if (kDcsFile == ftype) {
01070 ibeg = Per::kDcsAlarm;
01071 iend = Per::kDcsMonitor;
01072 }
01073
01074 string fbase = fnamebase;
01075
01076
01077 if (string::npos != fbase.find("bogus")) fbase = "bogus";
01078 int fversion = 0;
01079
01080
01081
01082 bool fromFile = fSocket->InheritsFrom("RotoSocket");
01083 if (fromFile) MSG("Roto",Msg::kInfo)
01084 << "Reading from RotoSocket - turn off autosave"
01085 << endl;
01086
01087 for (int i = ibeg; i<=iend; i++ ) {
01088 Per::EStreamType stype = (Per::EStreamType) i;
01089 string treeName = Per::AsString(stype);
01090 string rotoStrmName = fbase + "." + treeName;
01091
01092 Int_t compressLevel = fCompressionMap[stype];
01093 UInt_t basketSize = fBasketSizeMap[stype];
01094
01095 PerOutputStream* astream =
01096 fOutputStreamManager->OpenStream(rotoStrmName,treeName,
01097 "RawRecord","","",Per::kRecSplit,
01098 basketSize,compressLevel);
01099 if (astream) {
01100
01101
01102
01103 if (fromFile) astream->SetAutoSave(0,0);
01104 else {
01105 UInt_t autoSaveInt = fAutoSaveIntMap[stype];
01106 UInt_t autoSaveTime = fAutoSaveTimeMap[stype];
01107 astream->SetAutoSave(autoSaveInt,autoSaveTime);
01108 }
01109 bool openok_astream =
01110 fOutputStreamManager->SetFile(rotoStrmName,filename,fAccessMode);
01111
01112
01113
01114 Per::EErrorCode ecode = astream->GetErrorCode();
01115 if (Per::kErrFileExists == ecode) {
01116 string basefilename = filename;
01117 string::size_type where = filename.rfind(".m");
01118 if (string::npos == where) where = filename.rfind(".root");
01119 if (string::npos == where) where = filename.length();
01120 bool keep_trying = true;
01121
01122 do {
01123 ++fversion;
01124 filename = basefilename;
01125 filename.insert(where,Form(".%d",fversion));
01126 openok_astream =
01127 fOutputStreamManager->SetFile(rotoStrmName,filename,fAccessMode);
01128 if (Per::kErrSuccess != ecode) {
01129 syslog(LOG_INFO,
01130 " open versioned file saw Per error: (%d) %s",
01131 (int)ecode,Per::AsString(ecode));
01132 }
01133
01134 ecode = astream->GetErrorCode();
01135 keep_trying = (fversion<100) &&
01136 (Per::kErrFileExists == ecode);
01137 } while (keep_trying);
01138
01139 if (Per::kErrSuccess == ecode)
01140 syslog(LOG_INFO," open stream using file %s",filename.c_str());
01141
01142 }
01143
01144 if (Per::kErrSuccess != ecode) {
01145 syslog(LOG_INFO," open stream saw Per error: (%d) %s",
01146 (int)ecode,Per::AsString(ecode));
01147 }
01148
01149 openok = openok && openok_astream;
01150
01151 if (openok_astream)
01152 syslog(LOG_INFO," open stream %s as %s",
01153 treeName.c_str(),rotoStrmName.c_str());
01154
01155
01156
01157
01158
01159 }
01160 else {
01161 openok = false;
01162 }
01163 }
01164
01165 return (openok) ? kSuccess : kFailWhoops;
01166 }
01167
01168
01169 Int_t RotoServer::CloseDaqFile(Int_t detector, Int_t run, Int_t subrun)
01170 {
01171
01172
01173
01174
01175
01176
01177
01178
01179
01180
01181
01182
01183 string filebase = BuildDaqBaseName(detector,run,subrun);
01184 return CloseFile(filebase.c_str(),kDaqFile);
01185 }
01186
01187
01188 Int_t RotoServer::CloseFile(const Char_t* fnamebase, EFileType ftype)
01189 {
01190
01191
01192
01193
01194
01195
01196
01197
01198
01199
01200 const char* pbasedir = gSystem->Getenv("DAQ_DATA_DIR");
01201 if (!pbasedir) pbasedir = ".";
01202 string filename = string(pbasedir) + "/" +
01203 string(fnamebase) + "." +
01204 FileExtName(ftype);
01205 syslog(LOG_INFO,"CloseFile '%s'",filename.c_str());
01206
01207 bool closeok = true;
01208
01209
01210
01211
01212 int ibeg = Per::kDaqSnarl;
01213 int iend = Per::kDcsMonitor;
01214 if (kDaqFile == ftype) {
01215 ibeg = Per::kDaqSnarl;
01216 iend = Per::kLightInjection;
01217 }
01218 else
01219 if (kDcsFile == ftype) {
01220 ibeg = Per::kDcsAlarm;
01221 iend = Per::kDcsMonitor;
01222 }
01223
01224 string fbase = fnamebase;
01225
01226
01227 if (string::npos != fbase.find("bogus")) fbase = "bogus";
01228
01229 for (int i = ibeg; i<=iend; i++ ) {
01230 Per::EStreamType stype = (Per::EStreamType) i;
01231 string treeName = Per::AsString(stype);
01232 string rotoStrmName = fbase + "." + treeName;
01233
01234 fOutputStreamManager->Write(rotoStrmName);
01235 fOutputStreamManager->CloseStream(rotoStrmName);
01236 }
01237
01238
01239
01240 return (closeok) ? kSuccess : kFailWhoops;
01241 }
01242
01243
01244 string RotoServer::BuildDaqBaseName(Int_t detector, Int_t run, Int_t subrun)
01245 {
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255
01256
01257
01258 DetectorType::Detector_t det = (DetectorType::Detector_t)(detector);
01259 Char_t detchar = DetectorType::AsString(det)[0];
01260
01261 string fname = Form("%c%8.8d_%4.4d",detchar,run,subrun);
01262 return fname;
01263 }
01264
01265
01266 Int_t RotoServer::OpenDcsFile(Int_t detector, Int_t sec, Int_t nanosec)
01267 {
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277
01278
01279
01280 syslog(LOG_ERR,"OpenDcsFile %d %d %d not yet implemented",
01281 detector,sec,nanosec);
01282
01283 fNackMessage += "OpenDcsFile not yet implemented;";
01284 return kFailNoCodeYet;
01285 }
01286
01287
01288 Int_t RotoServer::CloseDcsFile(Int_t detector, Int_t sec, Int_t nanosec)
01289 {
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300
01301
01302 syslog(LOG_ERR,"CloseDcsFile %d %d %d not yet implemented",
01303 detector,sec,nanosec);
01304
01305 fNackMessage += "CloseDcsFile not yet implemented;";
01306 return kFailNoCodeYet;
01307 }
01308
01309
01310 string RotoServer::BuildDcsBaseName(Int_t detector, Int_t sec, Int_t nanosec)
01311 {
01312
01313
01314
01315
01316
01317
01318
01319
01320
01321
01322
01323
01324 DetectorType::Detector_t det = (DetectorType::Detector_t)(detector);
01325 Char_t detchar = DetectorType::AsString(det)[0];
01326 VldTimeStamp time(sec,nanosec);
01327
01328 string fname = Form("%c%s",detchar,time.AsString("s"));
01329 fname.replace(fname.find(" "),1,"_");
01330 return fname;
01331 }
01332
01333
01334 Int_t RotoServer::ProcessBuffer()
01335 {
01336
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346 static Int_t nrec = 0;
01347 nrec++;
01348
01349
01350
01351
01352 Int_t status = kSuccess;
01353 Int_t obj_status = 0;
01354
01355
01356 MomNavigator* mom =
01357 RotoObjectifier::BufferInflate(fRecvBuffer.GetArray(),
01358 fRecvBufferUsed,obj_status);
01359
01360 if (!mom) {
01361 syslog(LOG_ERR,"buffer inflate returned no MOM, rec %d err %d",
01362 nrec,obj_status);
01363 return kFailInflateBuff;
01364 }
01365
01366 if (4 == obj_status || 8 == obj_status) {
01367
01368 syslog(LOG_ERR,"partial buffer failure rec %d err %d",nrec,obj_status);
01369
01370 status |= kFailPartialRec;
01371 fNackMessage += "Portion of record could not be converted to RawDataBlocks;";
01372 }
01373
01374
01375
01376 const RawRecord *rawrec =
01377 dynamic_cast<const RawRecord *>(mom->GetFragment("RawRecord"));
01378
01379 string rotoStrmName = ChooseStreamName(rawrec);
01380
01381 if (!fOutputStreamManager) {
01382 syslog(LOG_ERR,"process buffer had no output stream manager");
01383 status |= kFailWriteRec;
01384 return status;
01385 }
01386
01387 PerOutputStream* stream = 0;
01388 stream = dynamic_cast<PerOutputStream*>
01389 (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01390
01391 if (!stream) {
01392
01393
01394
01395
01396 syslog(LOG_ERR,"process buffer had no stream '%s'",
01397 rotoStrmName.c_str());
01398 string streamType =
01399 rotoStrmName.substr(rotoStrmName.find_last_of(".")+1,string::npos);
01400 rotoStrmName = "bogus." + streamType;
01401 stream = dynamic_cast<PerOutputStream*>
01402 (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01403 if (!stream) {
01404
01405 EFileType ftype = kBogusFile;
01406 string fname = BogusFileName();
01407 Int_t err = OpenFile(fname.c_str(),ftype);
01408 if (err) {
01409 syslog(LOG_ERR,"process buffer could not open 'bogus' file '%s'",
01410 fname.c_str());
01411 }
01412
01413 stream = dynamic_cast<PerOutputStream*>
01414 (fOutputStreamManager->GetOpenedStream(rotoStrmName));
01415
01416 }
01417 }
01418
01419 if (stream) {
01420
01421
01422
01423
01424 RawRecord *rawrec_nonconst =
01425 const_cast<RawRecord *>(rawrec);
01426 stream->SetObject(rawrec_nonconst);
01427 stream->Store();
01428 }
01429 else {
01430 syslog(LOG_ERR,"process buffer still had no stream '%s'",
01431 rotoStrmName.c_str());
01432
01433
01434 status |= kFailWriteRec;
01435 }
01436
01437 delete mom;
01438
01439 return status;
01440 }
01441
01442
01443 string RotoServer::ChooseStreamName(const RawRecord* rawrec)
01444 {
01445
01446
01447
01448
01449
01450
01451
01452
01453
01454 string rotoStrmName;
01455
01456 VldContext vldc = rawrec->GetRawHeader()->GetVldContext();
01457
01458 const RawDaqHeader *rawdaqhead =
01459 dynamic_cast<const RawDaqHeader *>(rawrec->GetRawHeader());
01460
01461 if (rawdaqhead) {
01462
01463 Int_t run = rawdaqhead->GetRun();
01464 Int_t subrun = rawdaqhead->GetSubRun();
01465
01466 Per::EStreamType stype = Per::kDaqMonitor;
01467
01468 TIter blkiter =
01469 (const_cast<RawRecord*>(rawrec))->GetRawBlockIter();
01470 TObject* tobj = 0;
01471 const RawDataBlock* rb = 0;
01472
01473 Bool_t sawLISummary = false;
01474 Bool_t sawSnarlHeader = false;
01475 Bool_t sawDigitBlock = false;
01476
01477 while ( (tobj = blkiter()) ) {
01478 rb = dynamic_cast<RawDataBlock*>(tobj);
01479
01480 if ((dynamic_cast<const RawLIAdcSummaryBlock*>(rb)))
01481 sawLISummary = true;
01482
01483 if ((dynamic_cast<const RawLITimingSummaryBlock*>(rb)))
01484 sawLISummary = true;
01485
01486 if ((dynamic_cast<const RawSnarlHeaderBlock*>(rb)))
01487 sawSnarlHeader = true;
01488
01489 if ((dynamic_cast<const RawDigitDataBlock*>(rb)))
01490 sawDigitBlock = true;
01491 }
01492
01493
01494
01495
01496
01497 if (sawLISummary) {
01498 stype = Per::kLightInjection;
01499 }
01500 else if (sawSnarlHeader || sawDigitBlock ) {
01501 stype = Per::kDaqSnarl;
01502 if (!sawSnarlHeader || !sawDigitBlock ) {
01503 syslog(LOG_ERR,"XOR RawSnarlHeaderBlock %d RawDgitiDataBlock %d",
01504 sawSnarlHeader,sawDigitBlock);
01505
01506
01507
01508
01509
01510
01511 }
01512 }
01513
01514
01515
01516 if (fFakeDAQFileName != "") {
01517 rotoStrmName = fFakeDAQFileName;
01518 } else {
01519 rotoStrmName = BuildDaqBaseName(vldc.GetDetector(),run,subrun);
01520 }
01521 string streamType = Per::AsString(stype);
01522 rotoStrmName += "." + streamType;
01523
01524 }
01525 else {
01526
01527 rotoStrmName = ".DcsMonitor";
01528 syslog(LOG_ALERT," can't handle DCS yet");
01529 assert(0);
01530
01531 }
01532
01533 return rotoStrmName;
01534 }
01535
01536
01537 Int_t RotoServer::ProcessConfig()
01538 {
01539
01540
01541
01542
01543
01544
01545
01546
01547
01548
01549 char config[1024];
01550 char stream[1024];
01551 int nItems = 0;
01552 bool ok = true;
01553
01554
01555
01556
01557
01558 nItems = sscanf(fRecvBuffer.GetArray(),
01559 "config %s:",config);
01560
01561 if (!strncmp(config,"autosave",8)) {
01562 unsigned int nrec, nsec;
01563 nItems = sscanf(fRecvBuffer.GetArray(),
01564 "config autosave: %s = %u rec %u sec",
01565 stream,&nrec,&nsec);
01566
01567 if (nItems != 3) ok = false;
01568 else {
01569 int nmod = SetAutoSaveConfig(stream,nrec,nsec);
01570 if (nmod < 1) {
01571 syslog(LOG_WARNING,"ProcessConfig stream \"%s\" unknown\n",stream);
01572 }
01573 }
01574
01575 }
01576 else if (!strncmp(config,"compress",8)) {
01577 int level;
01578 nItems = sscanf(fRecvBuffer.GetArray(),
01579 "config compress: %s = %d",
01580 stream,&level);
01581
01582 if (nItems != 2) ok = false;
01583 else {
01584 int nmod = SetCompressConfig(stream,level);
01585 if (nmod < 1) {
01586 syslog(LOG_WARNING,"ProcessConfig stream \"%s\" unknown\n",stream);
01587 }
01588 }
01589 }
01590 else if (!strncmp(config,"basketsize",10)) {
01591 int basketsize;
01592 nItems = sscanf(fRecvBuffer.GetArray(),
01593 "config basketsize: %s = %d",
01594 stream,&basketsize);
01595
01596 if (nItems != 2) ok = false;
01597 else {
01598 int nmod = SetBasketSizeConfig(stream,basketsize);
01599 if (nmod < 1) {
01600 syslog(LOG_WARNING,"ProcessConfig stream \"%s\" unknown\n",stream);
01601 }
01602 }
01603
01604 }
01605 else ok = false;
01606
01607 if (!ok) syslog(LOG_WARNING,
01608 "ProcessConfig failed to interpret string: \"%s\"\n",
01609 fRecvBuffer.GetArray());
01610
01611
01612 return 0;
01613 }
01614
01615
01616 void RotoServer::BuildStateReport()
01617 {
01618
01619
01620
01621
01622
01623
01624
01625
01626
01627 fStateReport = RotoRcCmd::ElementAsStlString(MINOS_ROOTER);
01628 fStateReport += "; ";
01629 fStateReport +=
01630 RotoRcCmd::InstrAsStlString(MINOS_ROOTER_STATE_REPORT,fCurrentState);
01631 fStateReport += "; ";
01632 }
01633
01634