ExaTN
tensor_runtime.hpp
1 
44 #ifndef EXATN_RUNTIME_TENSOR_RUNTIME_HPP_
45 #define EXATN_RUNTIME_TENSOR_RUNTIME_HPP_
46 
47 #include "tensor_graph_executor.hpp"
48 #include "tensor_graph.hpp"
49 #include "tensor_operation.hpp"
50 #include "tensor_method.hpp"
51 
52 #include <map>
53 #include <list>
54 #include <string>
55 #include <vector>
56 #include <memory>
57 #include <thread>
58 #include <atomic>
59 #include <future>
60 #include <mutex>
61 
62 namespace exatn {
63 namespace runtime {
64 
65 class TensorRuntime final {
66 
67 public:
68  TensorRuntime(const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
69  const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
70  TensorRuntime(const TensorRuntime &) = delete;
71  TensorRuntime & operator=(const TensorRuntime &) = delete;
72  TensorRuntime(TensorRuntime &&) noexcept = delete;
73  TensorRuntime & operator=(TensorRuntime &&) noexcept = delete;
74  ~TensorRuntime();
75 
77  void resetLoggingLevel(int level = 0);
78 
80  void openScope(const std::string & scope_name);
81 
85  void pauseScope();
86 
88  void resumeScope(const std::string & scope_name);
89 
92  void closeScope();
93 
95  inline bool currentScopeIsSet() const {return !(current_scope_.empty());}
96 
98  VertexIdType submit(std::shared_ptr<TensorOperation> op);
99 
102  bool sync(TensorOperation & op,
103  bool wait = true);
104 
107  bool sync(const Tensor & tensor,
108  bool wait = true);
109 
113  std::future<std::shared_ptr<talsh::Tensor>> getLocalTensor(std::shared_ptr<Tensor> tensor, //in: exatn::numerics::Tensor to get slice of (by copy)
114  const std::vector<std::pair<DimOffset,DimExtent>> & slice_spec); //in: tensor slice specification
115 
116 private:
118  class TensorDataReq{
119  public:
120  std::promise<std::shared_ptr<talsh::Tensor>> slice_promise_;
121  std::vector<std::pair<DimOffset,DimExtent>> slice_specs_;
122  std::shared_ptr<Tensor> tensor_;
123 
124  TensorDataReq(std::promise<std::shared_ptr<talsh::Tensor>> && slice_promise,
125  const std::vector<std::pair<DimOffset,DimExtent>> & slice_specs,
126  std::shared_ptr<Tensor> tensor):
127  slice_promise_(std::move(slice_promise)), slice_specs_(slice_specs), tensor_(tensor){}
128 
129  TensorDataReq(const TensorDataReq & req) = delete;
130  TensorDataReq & operator=(const TensorDataReq & req) = delete;
131  TensorDataReq(TensorDataReq && req) noexcept = default;
132  TensorDataReq & operator=(TensorDataReq && req) noexcept = default;
133  ~TensorDataReq() = default;
134  };
135 
137  void launchExecutionThread();
139  void executionThreadWorkflow();
141  void processTensorDataRequests();
142 
143  inline void lockDataReqQ(){data_req_mtx_.lock();}
144  inline void unlockDataReqQ(){data_req_mtx_.unlock();}
145 
147  std::string graph_executor_name_;
149  std::string node_executor_name_;
151  std::shared_ptr<TensorGraphExecutor> graph_executor_;
153  std::map<std::string, std::shared_ptr<TensorGraph>> dags_;
155  std::string current_scope_;
157  std::shared_ptr<TensorGraph> current_dag_; //pointer to the current DAG
159  std::list<TensorDataReq> data_req_queue_;
161  int logging_;
163  std::atomic<bool> executing_; //TRUE while the execution thread is executing the current DAG
165  std::atomic<bool> alive_; //TRUE while the main thread is accepting new operations from Client
167  std::thread exec_thread_;
169  std::mutex data_req_mtx_;
170 };
171 
172 } // namespace runtime
173 } // namespace exatn
174 
175 #endif //EXATN_RUNTIME_TENSOR_RUNTIME_HPP_
exatn::numerics::Tensor
Definition: tensor.hpp:63
exatn::runtime::TensorRuntime::openScope
void openScope(const std::string &scope_name)
Definition: tensor_runtime.cpp:98
exatn::runtime::TensorRuntime::resetLoggingLevel
void resetLoggingLevel(int level=0)
Definition: tensor_runtime.cpp:90
exatn
Definition: DriverClient.hpp:10
exatn::runtime::TensorRuntime::getLocalTensor
std::future< std::shared_ptr< talsh::Tensor > > getLocalTensor(std::shared_ptr< Tensor > tensor, const std::vector< std::pair< DimOffset, DimExtent >> &slice_spec)
Definition: tensor_runtime.cpp:180
exatn::runtime::TensorRuntime::submit
VertexIdType submit(std::shared_ptr< TensorOperation > op)
Definition: tensor_runtime.cpp:149
exatn::numerics::TensorOperation
Definition: tensor_operation.hpp:36
exatn::runtime::TensorRuntime::pauseScope
void pauseScope()
Definition: tensor_runtime.cpp:118
exatn::runtime::TensorRuntime::sync
bool sync(TensorOperation &op, bool wait=true)
Definition: tensor_runtime.cpp:159
exatn::runtime::TensorRuntime::resumeScope
void resumeScope(const std::string &scope_name)
Definition: tensor_runtime.cpp:124
exatn::runtime::TensorRuntime
Definition: tensor_runtime.hpp:65
exatn::runtime::TensorRuntime::currentScopeIsSet
bool currentScopeIsSet() const
Definition: tensor_runtime.hpp:95
exatn::runtime::TensorRuntime::closeScope
void closeScope()
Definition: tensor_runtime.cpp:136