/* @(#)root/multiproc:$Id$ */
// Author: Enrico Guiraud July 2015

/*************************************************************************
 * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
 * All rights reserved.                                                  *
 *                                                                       *
 * For the licensing terms see $ROOTSYS/LICENSE.                         *
 * For the list of contributors see $ROOTSYS/README/CREDITS.             *
 *************************************************************************/

#ifndef ROOT_TMPClient
#define ROOT_TMPClient

#include "MPSendRecv.h"
#include "TMonitor.h"
#include "TMPWorker.h"
#include <memory> //unique_ptr
#include <iostream>
#include <unistd.h> //pid_t
#include <vector>

class TMPClient {
public:
   explicit TMPClient(unsigned nWorkers = 0);
   ~TMPClient();
   //it doesn't make sense to copy a TMPClient
   TMPClient(const TMPClient &) = delete;
   TMPClient &operator=(const TMPClient &) = delete;

   bool Fork(TMPWorker &server); // we expect application to pass a reference to an inheriting class and take advantage of polymorphism
   unsigned Broadcast(unsigned code, unsigned nMessages = 0);
   template<class T> unsigned Broadcast(unsigned code, const std::vector<T> &objs);
   template<class T> unsigned Broadcast(unsigned code, std::initializer_list<T> &objs);
   template<class T> unsigned Broadcast(unsigned code, T obj, unsigned nMessages = 0);
   TMonitor &GetMonitor() { return fMon; }
   bool GetIsParent() const { return fIsParent; }
   /// Set the number of workers that will be spawned by the next call to Fork()
   void SetNWorkers(unsigned n) { fNWorkers = n; }
   unsigned GetNWorkers() const { return fNWorkers; }
   void DeActivate(TSocket *s);
   void Remove(TSocket *s);
   void ReapWorkers();
   void HandleMPCode(MPCodeBufPair &msg, TSocket *sender);

private:
   bool fIsParent; ///< This is true if this is the parent/client process, false if this is a child/worker process
   std::vector<pid_t> fWorkerPids; ///< A vector containing the PIDs of children processes/workers
   TMonitor fMon; ///< This object manages the sockets and detect socket events via TMonitor::Select
   unsigned fNWorkers; ///< The number of workers that should be spawned upon forking
};


//////////////////////////////////////////////////////////////////////////
/// Send a message with a different object to each server.
/// Sockets can either be in an "active" or "non-active" state. This method
/// activates all the sockets through which the client is connected to the
/// workers, and deactivates them when a message is sent to the corresponding
/// worker. This way the sockets pertaining to workers who have been left
/// idle will be the only ones in the active list
/// (TSocket::GetMonitor()->GetListOfActives()) after execution.
/// \param code the code of the message to send (e.g. EMPCode)
/// \param args
/// \parblock
/// a vector containing the different messages to be sent. If the size of
/// the vector is smaller than the number of workers, a message will be
/// sent only to the first args.size() workers. If the size of the args vector
/// is bigger than the number of workers, only the first fNWorkers arguments
/// will be sent.
/// \endparblock
/// \return the number of messages successfully sent
template<class T>
unsigned TMPClient::Broadcast(unsigned code, const std::vector<T> &args)
{
   fMon.ActivateAll();

   std::unique_ptr<TList> lp(fMon.GetListOfActives());
   unsigned count = 0;
   unsigned nArgs = args.size();
   for (auto s : *lp) {
      if (count == nArgs)
         break;
      if (MPSend((TSocket *)s, code, args[count])) {
         fMon.DeActivate((TSocket *)s);
         ++count;
      } else {
         Error("TMPClient::Broadcast", "[E] Could not send message to server\n");
      }
   }

   return count;
}


//////////////////////////////////////////////////////////////////////////
/// Send a message with a different object to each server.
/// See TMPClient::Broadcast(unsigned code, const std::vector<T> &args)
/// for more informations.
template<class T>
unsigned TMPClient::Broadcast(unsigned code, std::initializer_list<T> &args)
{
   std::vector<T> vargs(std::move(args));
   return Broadcast(code, vargs);
}


//////////////////////////////////////////////////////////////////////////
/// Send a message containing code and obj to each worker, up to a
/// maximum number of nMessages workers. See
/// Broadcast(unsigned code, unsigned nMessages) for more informations.
/// \param code the code of the message to send (e.g. EMPCode)
/// \param obj the object to send
/// \param nMessages
/// \parblock
/// the maximum number of messages to send.
/// If nMessages == 0, send a message to every worker.
/// \endparblock
/// \return the number of messages successfully sent
template<class T>
unsigned TMPClient::Broadcast(unsigned code, T obj, unsigned nMessages)
{
   if (nMessages == 0)
      nMessages = fNWorkers;
   unsigned count = 0;
   fMon.ActivateAll();

   //send message to all sockets
   std::unique_ptr<TList> lp(fMon.GetListOfActives());
   for (auto s : *lp) {
      if (count == nMessages)
         break;
      if (MPSend((TSocket *)s, code, obj)) {
         fMon.DeActivate((TSocket *)s);
         ++count;
      } else {
         Error("TMPClient::Broadcast", "[E] Could not send message to server\n");
      }
   }

   return count;
}

#endif
 TMPClient.h:1
 TMPClient.h:2
 TMPClient.h:3
 TMPClient.h:4
 TMPClient.h:5
 TMPClient.h:6
 TMPClient.h:7
 TMPClient.h:8
 TMPClient.h:9
 TMPClient.h:10
 TMPClient.h:11
 TMPClient.h:12
 TMPClient.h:13
 TMPClient.h:14
 TMPClient.h:15
 TMPClient.h:16
 TMPClient.h:17
 TMPClient.h:18
 TMPClient.h:19
 TMPClient.h:20
 TMPClient.h:21
 TMPClient.h:22
 TMPClient.h:23
 TMPClient.h:24
 TMPClient.h:25
 TMPClient.h:26
 TMPClient.h:27
 TMPClient.h:28
 TMPClient.h:29
 TMPClient.h:30
 TMPClient.h:31
 TMPClient.h:32
 TMPClient.h:33
 TMPClient.h:34
 TMPClient.h:35
 TMPClient.h:36
 TMPClient.h:37
 TMPClient.h:38
 TMPClient.h:39
 TMPClient.h:40
 TMPClient.h:41
 TMPClient.h:42
 TMPClient.h:43
 TMPClient.h:44
 TMPClient.h:45
 TMPClient.h:46
 TMPClient.h:47
 TMPClient.h:48
 TMPClient.h:49
 TMPClient.h:50
 TMPClient.h:51
 TMPClient.h:52
 TMPClient.h:53
 TMPClient.h:54
 TMPClient.h:55
 TMPClient.h:56
 TMPClient.h:57
 TMPClient.h:58
 TMPClient.h:59
 TMPClient.h:60
 TMPClient.h:61
 TMPClient.h:62
 TMPClient.h:63
 TMPClient.h:64
 TMPClient.h:65
 TMPClient.h:66
 TMPClient.h:67
 TMPClient.h:68
 TMPClient.h:69
 TMPClient.h:70
 TMPClient.h:71
 TMPClient.h:72
 TMPClient.h:73
 TMPClient.h:74
 TMPClient.h:75
 TMPClient.h:76
 TMPClient.h:77
 TMPClient.h:78
 TMPClient.h:79
 TMPClient.h:80
 TMPClient.h:81
 TMPClient.h:82
 TMPClient.h:83
 TMPClient.h:84
 TMPClient.h:85
 TMPClient.h:86
 TMPClient.h:87
 TMPClient.h:88
 TMPClient.h:89
 TMPClient.h:90
 TMPClient.h:91
 TMPClient.h:92
 TMPClient.h:93
 TMPClient.h:94
 TMPClient.h:95
 TMPClient.h:96
 TMPClient.h:97
 TMPClient.h:98
 TMPClient.h:99
 TMPClient.h:100
 TMPClient.h:101
 TMPClient.h:102
 TMPClient.h:103
 TMPClient.h:104
 TMPClient.h:105
 TMPClient.h:106
 TMPClient.h:107
 TMPClient.h:108
 TMPClient.h:109
 TMPClient.h:110
 TMPClient.h:111
 TMPClient.h:112
 TMPClient.h:113
 TMPClient.h:114
 TMPClient.h:115
 TMPClient.h:116
 TMPClient.h:117
 TMPClient.h:118
 TMPClient.h:119
 TMPClient.h:120
 TMPClient.h:121
 TMPClient.h:122
 TMPClient.h:123
 TMPClient.h:124
 TMPClient.h:125
 TMPClient.h:126
 TMPClient.h:127
 TMPClient.h:128
 TMPClient.h:129
 TMPClient.h:130
 TMPClient.h:131
 TMPClient.h:132
 TMPClient.h:133
 TMPClient.h:134
 TMPClient.h:135
 TMPClient.h:136
 TMPClient.h:137
 TMPClient.h:138
 TMPClient.h:139
 TMPClient.h:140
 TMPClient.h:141
 TMPClient.h:142
 TMPClient.h:143