src/connect/services/grid_control_thread.cpp

Go to the documentation of this file.
00001 /*  $Id: grid_control_thread.cpp 150995 2009-01-30 18:16:28Z kazimird $
00002  * ===========================================================================
00003  *
00004  *                            PUBLIC DOMAIN NOTICE
00005  *               National Center for Biotechnology Information
00006  *
00007  *  This software/database is a "United States Government Work" under the
00008  *  terms of the United States Copyright Act.  It was written as part of
00009  *  the author's official duties as a United States Government employee and
00010  *  thus cannot be copyrighted.  This software/database is freely available
00011  *  to the public for use. The National Library of Medicine and the U.S.
00012  *   Government have not placed any restriction on its use or reproduction.
00013  *
00014  *  Although all reasonable efforts have been taken to ensure the accuracy
00015  *  and reliability of the software and data, the NLM and the U.S.
00016  *  Government do not and cannot warrant the performance or results that
00017  *  may be obtained by using this software or data. The NLM and the U.S.
00018  *  Government disclaim all warranties, express or implied, including
00019  *  warranties of performance, merchantability or fitness for any particular
00020  *  purpose.
00021  *
00022  *  Please cite the author in any work or product based on this material.
00023  *
00024  * ===========================================================================
00025  *
00026  * Authors:  Maxim Didenko, Dmitry Kazimirov
00027  *
00028  * File Description:
00029  *    NetSchedule Worker Node implementation
00030  */
00031 
00032 #include <ncbi_pch.hpp>
00033 
00034 #include <connect/services/grid_globals.hpp>
00035 #include <connect/services/grid_control_thread.hpp>
00036 #include <connect/services/grid_worker_app_impl.hpp>
00037 #include <connect/services/error_codes.hpp>
00038 
00039 #include <corelib/ncbistre.hpp>
00040 #include <corelib/ncbiapp.hpp>
00041 #include <corelib/ncbi_process.hpp>
00042 
00043 #include <math.h>
00044 
00045 
00046 #define NCBI_USE_ERRCODE_X   ConnServ_WorkerNode
00047 
00048 BEGIN_NCBI_SCOPE
00049 
00050 /////////////////////////////////////////////////////////////////////////////
00051 //
00052 ///@internal
00053 
00054 class CGetVersionProcessor : public CWorkerNodeControlThread::IRequestProcessor
00055 {
00056 public:
00057     virtual ~CGetVersionProcessor() {}
00058 
00059     virtual void Process(const string& request,
00060                          CNcbiOstream& os,
00061                          CGridWorkerNode& node)
00062     {
00063         os << "OK:" << node.GetJobVersion() << WN_BUILD_DATE;
00064     }
00065 };
00066 
00067 class CShutdownProcessor : public CWorkerNodeControlThread::IRequestProcessor
00068 {
00069 public:
00070     virtual ~CShutdownProcessor() {}
00071 
00072     virtual bool Authenticate(const string& host,
00073                               const string& auth,
00074                               const string& queue,
00075                               CNcbiOstream& os,
00076                               const CGridWorkerNode& node)
00077     {
00078         m_Host = host;
00079         size_t pos = m_Host.find_first_of(':');
00080         if (pos != string::npos) {
00081             m_Host = m_Host.substr(0, pos);
00082         }
00083         if (node.IsHostInAdminHostsList(m_Host)) {
00084             return true;
00085         }
00086         os << "ERR:Shutdown access denied.";
00087         LOG_POST_X(10, Warning << "Shutdown access denied for host " << m_Host);
00088         return false;
00089     }
00090 
00091     virtual void Process(const string& request,
00092                          CNcbiOstream& os,
00093                          CGridWorkerNode& )
00094     {
00095         if (request.find("SUICIDE") != NPOS) {
00096             LOG_POST_X(11, Warning <<
00097                 "Shutdown request has been received from host: " << m_Host);
00098             LOG_POST_X(12, Warning << "Server is shutting down");
00099             CGridGlobals::GetInstance().KillNode();
00100         } else {
00101             CNetScheduleAdmin::EShutdownLevel level =
00102                 CNetScheduleAdmin::eNormalShutdown;
00103             if (request.find("IMMEDIATE") != NPOS)
00104                 level = CNetScheduleAdmin::eShutdownImmediate;
00105             os << "OK:";
00106             CGridGlobals::GetInstance().
00107                 RequestShutdown(level);
00108             LOG_POST_X(13, "Shutdown request has been received from host " <<
00109                 m_Host);
00110         }
00111     }
00112 private:
00113     string m_Host;
00114 };
00115 
00116 class CGetStatisticsProcessor : public CWorkerNodeControlThread::IRequestProcessor
00117 {
00118 public:
00119     CGetStatisticsProcessor() {}
00120     virtual ~CGetStatisticsProcessor() {}
00121 
00122     virtual void Process(const string& request,
00123                          CNcbiOstream& os,
00124                          CGridWorkerNode& node)
00125     {
00126         os << "OK:" << node.GetJobVersion() << WN_BUILD_DATE << endl;
00127         os << "Node started at: " << CGridGlobals::GetInstance().GetStartTime().AsString() << endl;
00128         CNcbiApplication* app = CNcbiApplication::Instance();
00129         if (app)
00130             os << "Executable path: " << app->GetProgramExecutablePath()
00131                << "; PID: " << CProcess::GetCurrentPid() << endl;
00132 
00133         os << "Queue name: " << node.GetQueueName() << endl;
00134         if (node.GetMaxThreads() > 1)
00135             os << "Maximum job threads: " << node.GetMaxThreads() << endl;
00136 
00137         if (CGridGlobals::GetInstance().
00138             GetShutdownLevel() != CNetScheduleAdmin::eNoShutdown) {
00139                 os << "THE NODE IS IN A SHUTTING DOWN MODE!!!" << endl;
00140         }
00141         if (node.IsExclusiveMode())
00142             os << "THE NODE IS IN AN EXCLUSIVE MODE!!!" << endl;
00143 
00144         CGridGlobals::GetInstance().GetJobsWatcher().Print(os);
00145     }
00146 };
00147 
00148 class CGetLoadProcessor : public CWorkerNodeControlThread::IRequestProcessor
00149 {
00150 public:
00151     CGetLoadProcessor()  {}
00152     virtual ~CGetLoadProcessor() {}
00153 
00154     virtual bool Authenticate(const string& host,
00155                               const string& auth,
00156                               const string& queue,
00157                               CNcbiOstream& os,
00158                               const CGridWorkerNode& node)
00159     {
00160         string cmp = node.GetClientName() + " prog='" + node.GetJobVersion() + '\'';
00161         if (auth != cmp) {
00162             os <<"ERR:Wrong Program. Required: " << node.GetJobVersion()
00163                << endl << auth << endl << cmp;
00164             return false;
00165         }
00166         string qname, connection_info;
00167         NStr::SplitInTwo(queue, ";", qname, connection_info);
00168         if (qname != node.GetQueueName()) {
00169             os << "ERR:Wrong Queue. Required: " << node.GetQueueName();
00170             return false;
00171         }
00172         if (connection_info != node.GetServiceName()) {
00173             os << "ERR:Wrong Connection Info. Required: "
00174                << node.GetServiceName();
00175             return false;
00176         }
00177         return true;
00178     }
00179 
00180     virtual void Process(const string& request,
00181                          CNcbiOstream& os,
00182                          CGridWorkerNode& node)
00183     {
00184         int load = node.GetMaxThreads() -
00185             CGridGlobals::GetInstance().GetJobsWatcher().GetJobsRunningNumber();
00186         os << "OK:" << load;
00187     }
00188 };
00189 
00190 class CUnknownProcessor : public CWorkerNodeControlThread::IRequestProcessor
00191 {
00192 public:
00193     virtual ~CUnknownProcessor() {}
00194 
00195     virtual void Process(const string& request,
00196                          CNcbiOstream& os,
00197                          CGridWorkerNode& node)
00198     {
00199         os << "ERR:Unknown command -- " << request;
00200     }
00201 };
00202 
00203 const string SHUTDOWN_CMD = "SHUTDOWN";
00204 const string VERSION_CMD = "VERSION";
00205 const string STAT_CMD = "STAT";
00206 const string GETLOAD_CMD = "GETLOAD";
00207 
00208 /////////////////////////////////////////////////////////////////////////////
00209 //
00210 ///@internal
00211 
00212 /* static */
00213 CWorkerNodeControlThread::IRequestProcessor*
00214 CWorkerNodeControlThread::MakeProcessor(const string& cmd)
00215 {
00216     if (NStr::StartsWith(cmd, SHUTDOWN_CMD))
00217         return new CShutdownProcessor;
00218     else if (NStr::StartsWith(cmd, VERSION_CMD))
00219         return new CGetVersionProcessor;
00220     else if (NStr::StartsWith(cmd, STAT_CMD))
00221         return new CGetStatisticsProcessor;
00222     else if (NStr::StartsWith(cmd, GETLOAD_CMD))
00223         return new CGetLoadProcessor;
00224     return new CUnknownProcessor;
00225 }
00226 
00227 class CWNCTConnectionFactory : public IServer_ConnectionFactory
00228 {
00229 public:
00230     CWNCTConnectionFactory(CWorkerNodeControlThread& server, unsigned int& start_port, unsigned int end_port)
00231         : m_Server(server), m_Port(start_port), m_EndPort(end_port)
00232     {}
00233     virtual IServer_ConnectionHandler* Create(void) {
00234         return new CWNCTConnectionHandler(m_Server);
00235     }
00236     virtual EListenAction OnFailure(unsigned short* port )
00237     {
00238         if (*port >= m_EndPort)
00239             return eLAFail;
00240         m_Port = ++(*port);
00241         return eLARetry;
00242     }
00243 
00244 private:
00245     CWorkerNodeControlThread& m_Server;
00246     unsigned int& m_Port;
00247     unsigned int m_EndPort;
00248 };
00249 
00250 static STimeout kAcceptTimeout = {1,0};
00251 CWorkerNodeControlThread::CWorkerNodeControlThread(unsigned int start_port, unsigned int end_port,
00252                                                    CGridWorkerNode& worker_node)
00253     : m_WorkerNode(worker_node), m_ShutdownRequested(false), m_Port(start_port)
00254 {
00255     SServer_Parameters params;
00256     params.init_threads = 1;
00257     params.max_threads = 3;
00258     params.accept_timeout = &kAcceptTimeout;
00259     SetParameters(params);
00260     AddListener(new CWNCTConnectionFactory(*this, m_Port, end_port),m_Port);
00261 }
00262 
00263 CWorkerNodeControlThread::~CWorkerNodeControlThread()
00264 {
00265     LOG_POST_X(14, "Control server stopped.");
00266 }
00267 bool CWorkerNodeControlThread::ShutdownRequested(void)
00268 {
00269     return m_ShutdownRequested;
00270 }
00271 
00272 void CWorkerNodeControlThread::ProcessTimeout(void)
00273 {
00274     CGridGlobals::GetInstance().GetJobsWatcher().CheckInfinitLoop();
00275 }
00276 
00277 
00278 
00279 ////////////////////////////////////////////////
00280 static string s_ReadStrFromBUF(BUF buf)
00281 {
00282     size_t size = BUF_Size(buf);
00283     string ret(size, '\0');
00284     BUF_Read(buf, &ret[0], size);
00285     return ret;
00286 }
00287 
00288 CWNCTConnectionHandler::CWNCTConnectionHandler(CWorkerNodeControlThread& server)
00289     : m_Server(server)
00290 {}
00291 
00292 CWNCTConnectionHandler::~CWNCTConnectionHandler()
00293 {}
00294 
00295 void CWNCTConnectionHandler::OnOpen(void)
00296 {
00297     CSocket& socket = GetSocket();
00298     socket.DisableOSSendDelay();
00299     m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessAuth;
00300 
00301 }
00302 
00303 static void s_HandleError(CSocket& socket, const string& msg)
00304 {
00305     ERR_POST_X(15, "Exception in the control server: " << msg);
00306     string err = "ERR:" + NStr::PrintableString(msg);
00307     socket.Write(&err[0], err.size());
00308     socket.Close();
00309 }
00310 void CWNCTConnectionHandler::OnMessage(BUF buffer)
00311 {
00312     try {
00313         (this->*m_ProcessMessage)(buffer);
00314     } catch(exception& ex) {
00315         s_HandleError(GetSocket(), ex.what());
00316     } catch(...) {
00317         s_HandleError(GetSocket(), "Unknown Error");
00318     }
00319 }
00320 
00321 void CWNCTConnectionHandler::x_ProcessAuth(BUF buffer)
00322 {
00323     m_Auth = s_ReadStrFromBUF(buffer);
00324     m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessQueue;
00325 }
00326 void CWNCTConnectionHandler::x_ProcessQueue(BUF buffer)
00327 {
00328     m_Queue = s_ReadStrFromBUF(buffer);
00329     m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessRequest;
00330 }
00331 void CWNCTConnectionHandler::x_ProcessRequest(BUF buffer)
00332 {
00333     string request = s_ReadStrFromBUF(buffer);
00334 
00335     CSocket& socket = GetSocket();
00336     string host = socket.GetPeerAddress();
00337 
00338     CNcbiOstrstream os;
00339     auto_ptr<CWorkerNodeControlThread::IRequestProcessor>
00340         processor(m_Server.MakeProcessor(request));
00341     if (processor->Authenticate(host, m_Auth, m_Queue, os,
00342                                 m_Server.GetWorkerNode()))
00343         processor->Process(request, os, m_Server.GetWorkerNode());
00344 
00345     os << ends;
00346     try {
00347         socket.Write(os.str(), os.pcount());
00348     }  catch (...) {
00349         os.freeze(false);
00350         throw;
00351     }
00352     os.freeze(false);
00353     socket.Close();
00354 }
00355 
00356 END_NCBI_SCOPE
00357 
00358 

Generated on Sun Feb 8 21:49:24 2009 for NCBI C++ ToolKit by  doxygen 1.4.6
Modified on Mon Feb 09 14:47:09 2009 by modify_doxy.py rev. 117643