00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include <ncbi_pch.hpp>
00033
00034 #include <connect/services/grid_globals.hpp>
00035 #include <connect/services/grid_control_thread.hpp>
00036 #include <connect/services/grid_worker_app_impl.hpp>
00037 #include <connect/services/error_codes.hpp>
00038
00039 #include <corelib/ncbistre.hpp>
00040 #include <corelib/ncbiapp.hpp>
00041 #include <corelib/ncbi_process.hpp>
00042
00043 #include <math.h>
00044
00045
00046 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
00047
00048 BEGIN_NCBI_SCOPE
00049
00050
00051
00052
00053
00054 class CGetVersionProcessor : public CWorkerNodeControlThread::IRequestProcessor
00055 {
00056 public:
00057 virtual ~CGetVersionProcessor() {}
00058
00059 virtual void Process(const string& request,
00060 CNcbiOstream& os,
00061 CGridWorkerNode& node)
00062 {
00063 os << "OK:" << node.GetJobVersion() << WN_BUILD_DATE;
00064 }
00065 };
00066
00067 class CShutdownProcessor : public CWorkerNodeControlThread::IRequestProcessor
00068 {
00069 public:
00070 virtual ~CShutdownProcessor() {}
00071
00072 virtual bool Authenticate(const string& host,
00073 const string& auth,
00074 const string& queue,
00075 CNcbiOstream& os,
00076 const CGridWorkerNode& node)
00077 {
00078 m_Host = host;
00079 size_t pos = m_Host.find_first_of(':');
00080 if (pos != string::npos) {
00081 m_Host = m_Host.substr(0, pos);
00082 }
00083 if (node.IsHostInAdminHostsList(m_Host)) {
00084 return true;
00085 }
00086 os << "ERR:Shutdown access denied.";
00087 LOG_POST_X(10, Warning << "Shutdown access denied for host " << m_Host);
00088 return false;
00089 }
00090
00091 virtual void Process(const string& request,
00092 CNcbiOstream& os,
00093 CGridWorkerNode& )
00094 {
00095 if (request.find("SUICIDE") != NPOS) {
00096 LOG_POST_X(11, Warning <<
00097 "Shutdown request has been received from host: " << m_Host);
00098 LOG_POST_X(12, Warning << "Server is shutting down");
00099 CGridGlobals::GetInstance().KillNode();
00100 } else {
00101 CNetScheduleAdmin::EShutdownLevel level =
00102 CNetScheduleAdmin::eNormalShutdown;
00103 if (request.find("IMMEDIATE") != NPOS)
00104 level = CNetScheduleAdmin::eShutdownImmediate;
00105 os << "OK:";
00106 CGridGlobals::GetInstance().
00107 RequestShutdown(level);
00108 LOG_POST_X(13, "Shutdown request has been received from host " <<
00109 m_Host);
00110 }
00111 }
00112 private:
00113 string m_Host;
00114 };
00115
00116 class CGetStatisticsProcessor : public CWorkerNodeControlThread::IRequestProcessor
00117 {
00118 public:
00119 CGetStatisticsProcessor() {}
00120 virtual ~CGetStatisticsProcessor() {}
00121
00122 virtual void Process(const string& request,
00123 CNcbiOstream& os,
00124 CGridWorkerNode& node)
00125 {
00126 os << "OK:" << node.GetJobVersion() << WN_BUILD_DATE << endl;
00127 os << "Node started at: " << CGridGlobals::GetInstance().GetStartTime().AsString() << endl;
00128 CNcbiApplication* app = CNcbiApplication::Instance();
00129 if (app)
00130 os << "Executable path: " << app->GetProgramExecutablePath()
00131 << "; PID: " << CProcess::GetCurrentPid() << endl;
00132
00133 os << "Queue name: " << node.GetQueueName() << endl;
00134 if (node.GetMaxThreads() > 1)
00135 os << "Maximum job threads: " << node.GetMaxThreads() << endl;
00136
00137 if (CGridGlobals::GetInstance().
00138 GetShutdownLevel() != CNetScheduleAdmin::eNoShutdown) {
00139 os << "THE NODE IS IN A SHUTTING DOWN MODE!!!" << endl;
00140 }
00141 if (node.IsExclusiveMode())
00142 os << "THE NODE IS IN AN EXCLUSIVE MODE!!!" << endl;
00143
00144 CGridGlobals::GetInstance().GetJobsWatcher().Print(os);
00145 }
00146 };
00147
00148 class CGetLoadProcessor : public CWorkerNodeControlThread::IRequestProcessor
00149 {
00150 public:
00151 CGetLoadProcessor() {}
00152 virtual ~CGetLoadProcessor() {}
00153
00154 virtual bool Authenticate(const string& host,
00155 const string& auth,
00156 const string& queue,
00157 CNcbiOstream& os,
00158 const CGridWorkerNode& node)
00159 {
00160 string cmp = node.GetClientName() + " prog='" + node.GetJobVersion() + '\'';
00161 if (auth != cmp) {
00162 os <<"ERR:Wrong Program. Required: " << node.GetJobVersion()
00163 << endl << auth << endl << cmp;
00164 return false;
00165 }
00166 string qname, connection_info;
00167 NStr::SplitInTwo(queue, ";", qname, connection_info);
00168 if (qname != node.GetQueueName()) {
00169 os << "ERR:Wrong Queue. Required: " << node.GetQueueName();
00170 return false;
00171 }
00172 if (connection_info != node.GetServiceName()) {
00173 os << "ERR:Wrong Connection Info. Required: "
00174 << node.GetServiceName();
00175 return false;
00176 }
00177 return true;
00178 }
00179
00180 virtual void Process(const string& request,
00181 CNcbiOstream& os,
00182 CGridWorkerNode& node)
00183 {
00184 int load = node.GetMaxThreads() -
00185 CGridGlobals::GetInstance().GetJobsWatcher().GetJobsRunningNumber();
00186 os << "OK:" << load;
00187 }
00188 };
00189
00190 class CUnknownProcessor : public CWorkerNodeControlThread::IRequestProcessor
00191 {
00192 public:
00193 virtual ~CUnknownProcessor() {}
00194
00195 virtual void Process(const string& request,
00196 CNcbiOstream& os,
00197 CGridWorkerNode& node)
00198 {
00199 os << "ERR:Unknown command -- " << request;
00200 }
00201 };
00202
00203 const string SHUTDOWN_CMD = "SHUTDOWN";
00204 const string VERSION_CMD = "VERSION";
00205 const string STAT_CMD = "STAT";
00206 const string GETLOAD_CMD = "GETLOAD";
00207
00208
00209
00210
00211
00212
00213 CWorkerNodeControlThread::IRequestProcessor*
00214 CWorkerNodeControlThread::MakeProcessor(const string& cmd)
00215 {
00216 if (NStr::StartsWith(cmd, SHUTDOWN_CMD))
00217 return new CShutdownProcessor;
00218 else if (NStr::StartsWith(cmd, VERSION_CMD))
00219 return new CGetVersionProcessor;
00220 else if (NStr::StartsWith(cmd, STAT_CMD))
00221 return new CGetStatisticsProcessor;
00222 else if (NStr::StartsWith(cmd, GETLOAD_CMD))
00223 return new CGetLoadProcessor;
00224 return new CUnknownProcessor;
00225 }
00226
00227 class CWNCTConnectionFactory : public IServer_ConnectionFactory
00228 {
00229 public:
00230 CWNCTConnectionFactory(CWorkerNodeControlThread& server, unsigned int& start_port, unsigned int end_port)
00231 : m_Server(server), m_Port(start_port), m_EndPort(end_port)
00232 {}
00233 virtual IServer_ConnectionHandler* Create(void) {
00234 return new CWNCTConnectionHandler(m_Server);
00235 }
00236 virtual EListenAction OnFailure(unsigned short* port )
00237 {
00238 if (*port >= m_EndPort)
00239 return eLAFail;
00240 m_Port = ++(*port);
00241 return eLARetry;
00242 }
00243
00244 private:
00245 CWorkerNodeControlThread& m_Server;
00246 unsigned int& m_Port;
00247 unsigned int m_EndPort;
00248 };
00249
00250 static STimeout kAcceptTimeout = {1,0};
00251 CWorkerNodeControlThread::CWorkerNodeControlThread(unsigned int start_port, unsigned int end_port,
00252 CGridWorkerNode& worker_node)
00253 : m_WorkerNode(worker_node), m_ShutdownRequested(false), m_Port(start_port)
00254 {
00255 SServer_Parameters params;
00256 params.init_threads = 1;
00257 params.max_threads = 3;
00258 params.accept_timeout = &kAcceptTimeout;
00259 SetParameters(params);
00260 AddListener(new CWNCTConnectionFactory(*this, m_Port, end_port),m_Port);
00261 }
00262
00263 CWorkerNodeControlThread::~CWorkerNodeControlThread()
00264 {
00265 LOG_POST_X(14, "Control server stopped.");
00266 }
00267 bool CWorkerNodeControlThread::ShutdownRequested(void)
00268 {
00269 return m_ShutdownRequested;
00270 }
00271
00272 void CWorkerNodeControlThread::ProcessTimeout(void)
00273 {
00274 CGridGlobals::GetInstance().GetJobsWatcher().CheckInfinitLoop();
00275 }
00276
00277
00278
00279
00280 static string s_ReadStrFromBUF(BUF buf)
00281 {
00282 size_t size = BUF_Size(buf);
00283 string ret(size, '\0');
00284 BUF_Read(buf, &ret[0], size);
00285 return ret;
00286 }
00287
00288 CWNCTConnectionHandler::CWNCTConnectionHandler(CWorkerNodeControlThread& server)
00289 : m_Server(server)
00290 {}
00291
00292 CWNCTConnectionHandler::~CWNCTConnectionHandler()
00293 {}
00294
00295 void CWNCTConnectionHandler::OnOpen(void)
00296 {
00297 CSocket& socket = GetSocket();
00298 socket.DisableOSSendDelay();
00299 m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessAuth;
00300
00301 }
00302
00303 static void s_HandleError(CSocket& socket, const string& msg)
00304 {
00305 ERR_POST_X(15, "Exception in the control server: " << msg);
00306 string err = "ERR:" + NStr::PrintableString(msg);
00307 socket.Write(&err[0], err.size());
00308 socket.Close();
00309 }
00310 void CWNCTConnectionHandler::OnMessage(BUF buffer)
00311 {
00312 try {
00313 (this->*m_ProcessMessage)(buffer);
00314 } catch(exception& ex) {
00315 s_HandleError(GetSocket(), ex.what());
00316 } catch(...) {
00317 s_HandleError(GetSocket(), "Unknown Error");
00318 }
00319 }
00320
00321 void CWNCTConnectionHandler::x_ProcessAuth(BUF buffer)
00322 {
00323 m_Auth = s_ReadStrFromBUF(buffer);
00324 m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessQueue;
00325 }
00326 void CWNCTConnectionHandler::x_ProcessQueue(BUF buffer)
00327 {
00328 m_Queue = s_ReadStrFromBUF(buffer);
00329 m_ProcessMessage = &CWNCTConnectionHandler::x_ProcessRequest;
00330 }
00331 void CWNCTConnectionHandler::x_ProcessRequest(BUF buffer)
00332 {
00333 string request = s_ReadStrFromBUF(buffer);
00334
00335 CSocket& socket = GetSocket();
00336 string host = socket.GetPeerAddress();
00337
00338 CNcbiOstrstream os;
00339 auto_ptr<CWorkerNodeControlThread::IRequestProcessor>
00340 processor(m_Server.MakeProcessor(request));
00341 if (processor->Authenticate(host, m_Auth, m_Queue, os,
00342 m_Server.GetWorkerNode()))
00343 processor->Process(request, os, m_Server.GetWorkerNode());
00344
00345 os << ends;
00346 try {
00347 socket.Write(os.str(), os.pcount());
00348 } catch (...) {
00349 os.freeze(false);
00350 throw;
00351 }
00352 os.freeze(false);
00353 socket.Close();
00354 }
00355
00356 END_NCBI_SCOPE
00357
00358