Standardizing copyright notice. Touches **ALL** sources, guys, sorry.. it's
[cvc5.git] / src / main / portfolio_util.h
1 /********************* */
2 /*! \file portfolio_util.h
3 ** \verbatim
4 ** Original author: kshitij
5 ** Major contributors: none
6 ** Minor contributors (to current version): mdeters
7 ** This file is part of the CVC4 prototype.
8 ** Copyright (c) 2009-2012 New York University and The University of Iowa
9 ** See the file COPYING in the top-level source directory for licensing
10 ** information.\endverbatim
11 **
12 ** \brief Code relevant only for portfolio builds
13 **/
14
15 #ifndef __CVC4__PORTFOLIO_UTIL_H
16 #define __CVC4__PORTFOLIO_UTIL_H
17
18 #include <queue>
19
20 #include "expr/pickler.h"
21 #include "util/channel.h"
22 #include "util/lemma_input_channel.h"
23 #include "util/lemma_output_channel.h"
24 #include "main/options.h"
25
26 namespace CVC4 {
27
28 typedef expr::pickle::Pickle ChannelFormat;
29
30 class PortfolioLemmaOutputChannel : public LemmaOutputChannel {
31 private:
32 std::string d_tag;
33 SharedChannel<ChannelFormat>* d_sharedChannel;
34 expr::pickle::MapPickler d_pickler;
35
36 public:
37 int cnt;
38 PortfolioLemmaOutputChannel(std::string tag,
39 SharedChannel<ChannelFormat> *c,
40 ExprManager* em,
41 VarMap& to,
42 VarMap& from) :
43 d_tag(tag),
44 d_sharedChannel(c),
45 d_pickler(em, to, from),
46 cnt(0)
47 {}
48
49 void notifyNewLemma(Expr lemma) {
50 if(int(lemma.getNumChildren()) > options::sharingFilterByLength()) {
51 return;
52 }
53 ++cnt;
54 Trace("sharing") << d_tag << ": " << lemma << std::endl;
55 expr::pickle::Pickle pkl;
56 try {
57 d_pickler.toPickle(lemma, pkl);
58 d_sharedChannel->push(pkl);
59 if(Trace.isOn("showSharing") && options::thread_id() == 0) {
60 *options::out() << "thread #0: notifyNewLemma: " << lemma
61 << std::endl;
62 }
63 } catch(expr::pickle::PicklingException& p){
64 Trace("sharing::blocked") << lemma << std::endl;
65 }
66 }
67
68 };/* class PortfolioLemmaOutputChannel */
69
70 class PortfolioLemmaInputChannel : public LemmaInputChannel {
71 private:
72 std::string d_tag;
73 SharedChannel<ChannelFormat>* d_sharedChannel;
74 expr::pickle::MapPickler d_pickler;
75
76 public:
77 PortfolioLemmaInputChannel(std::string tag,
78 SharedChannel<ChannelFormat>* c,
79 ExprManager* em,
80 VarMap& to,
81 VarMap& from) :
82 d_tag(tag),
83 d_sharedChannel(c),
84 d_pickler(em, to, from){
85 }
86
87 bool hasNewLemma(){
88 Debug("lemmaInputChannel") << d_tag << ": " << "hasNewLemma" << std::endl;
89 return !d_sharedChannel->empty();
90 }
91
92 Expr getNewLemma() {
93 Debug("lemmaInputChannel") << d_tag << ": " << "getNewLemma" << std::endl;
94 expr::pickle::Pickle pkl = d_sharedChannel->pop();
95
96 Expr e = d_pickler.fromPickle(pkl);
97 if(Trace.isOn("showSharing") && options::thread_id() == 0) {
98 *options::out() << "thread #0: getNewLemma: " << e << std::endl;
99 }
100 return e;
101 }
102
103 };/* class PortfolioLemmaInputChannel */
104
105 std::vector<Options> parseThreadSpecificOptions(Options opts);
106
107 template<typename T>
108 void sharingManager(unsigned numThreads,
109 SharedChannel<T> *channelsOut[], // out and in with respect
110 SharedChannel<T> *channelsIn[],
111 SmtEngine *smts[]) // to smt engines
112 {
113 Trace("sharing") << "sharing: thread started " << std::endl;
114 std::vector <int> cnt(numThreads); // Debug("sharing")
115
116 std::vector< std::queue<T> > queues;
117 for(unsigned i = 0; i < numThreads; ++i){
118 queues.push_back(std::queue<T>());
119 }
120
121 const unsigned int sharingBroadcastInterval = 1;
122
123 boost::mutex mutex_activity;
124
125 /* Disable interruption, so that we can check manually */
126 boost::this_thread::disable_interruption di;
127
128 while(not boost::this_thread::interruption_requested()) {
129
130 boost::this_thread::sleep
131 (boost::posix_time::milliseconds(sharingBroadcastInterval));
132
133 for(unsigned t = 0; t < numThreads; ++t) {
134
135 /* No activity on this channel */
136 if(channelsOut[t]->empty()) continue;
137
138 /* Alert if channel full, so that we increase sharingChannelSize
139 or decrease sharingBroadcastInterval */
140 assert(not channelsOut[t]->full());
141
142 T data = channelsOut[t]->pop();
143
144 if(Trace.isOn("sharing")) {
145 ++cnt[t];
146 Trace("sharing") << "sharing: Got data. Thread #" << t
147 << ". Chunk " << cnt[t] << std::endl;
148 }
149
150 for(unsigned u = 0; u < numThreads; ++u) {
151 if(u != t){
152 Trace("sharing") << "sharing: adding to queue " << u << std::endl;
153 queues[u].push(data);
154 }
155 }/* end of inner for: broadcast activity */
156
157 } /* end of outer for: look for activity */
158
159 for(unsigned t = 0; t < numThreads; ++t){
160 /* Alert if channel full, so that we increase sharingChannelSize
161 or decrease sharingBroadcastInterval */
162 assert(not channelsIn[t]->full());
163
164 while(!queues[t].empty() && !channelsIn[t]->full()){
165 Trace("sharing") << "sharing: pushing on channel " << t << std::endl;
166 T data = queues[t].front();
167 channelsIn[t]->push(data);
168 queues[t].pop();
169 }
170 }
171 } /* end of infinite while */
172
173 Trace("interrupt")
174 << "sharing thread interrupted, interrupting all smtEngines" << std::endl;
175
176 for(unsigned t = 0; t < numThreads; ++t) {
177 Trace("interrupt") << "Interrupting thread #" << t << std::endl;
178 try{
179 smts[t]->interrupt();
180 }catch(ModalException &e){
181 // It's fine, the thread is probably not there.
182 Trace("interrupt") << "Could not interrupt thread #" << t << std::endl;
183 }
184 }
185
186 Trace("sharing") << "sharing: Interrupted, exiting." << std::endl;
187 }/* sharingManager() */
188
189 }/* CVC4 namespace */
190
191 #endif /* __CVC4__PORTFOLIO_UTIL_H */