Fix portfolio builds after yesterday's commits.
[cvc5.git] / src / main / portfolio_util.h
1 /********************* */
2 /*! \file portfolio_util.h
3 ** \verbatim
4 ** Original author: Kshitij Bansal
5 ** Major contributors: none
6 ** Minor contributors (to current version): Morgan Deters
7 ** This file is part of the CVC4 project.
8 ** Copyright (c) 2009-2013 New York University and The University of Iowa
9 ** See the file COPYING in the top-level source directory for licensing
10 ** information.\endverbatim
11 **
12 ** \brief Code relevant only for portfolio builds
13 **/
14
15 #ifndef __CVC4__PORTFOLIO_UTIL_H
16 #define __CVC4__PORTFOLIO_UTIL_H
17
18 #include <queue>
19
20 #include "expr/pickler.h"
21 #include "util/channel.h"
22 #include "util/lemma_input_channel.h"
23 #include "util/lemma_output_channel.h"
24 #include "util/output.h"
25 #include "main/options.h"
26
27 namespace CVC4 {
28
29 typedef expr::pickle::Pickle ChannelFormat;
30
31 class PortfolioLemmaOutputChannel : public LemmaOutputChannel {
32 private:
33 std::string d_tag;
34 SharedChannel<ChannelFormat>* d_sharedChannel;
35 expr::pickle::MapPickler d_pickler;
36
37 public:
38 int cnt;
39 PortfolioLemmaOutputChannel(std::string tag,
40 SharedChannel<ChannelFormat> *c,
41 ExprManager* em,
42 VarMap& to,
43 VarMap& from) :
44 d_tag(tag),
45 d_sharedChannel(c),
46 d_pickler(em, to, from),
47 cnt(0)
48 {}
49
50 ~PortfolioLemmaOutputChannel() throw() { }
51
52 void notifyNewLemma(Expr lemma) {
53 if(int(lemma.getNumChildren()) > options::sharingFilterByLength()) {
54 return;
55 }
56 ++cnt;
57 Trace("sharing") << d_tag << ": " << lemma << std::endl;
58 expr::pickle::Pickle pkl;
59 try {
60 d_pickler.toPickle(lemma, pkl);
61 d_sharedChannel->push(pkl);
62 if(Trace.isOn("showSharing") && options::thread_id() == 0) {
63 *options::out() << "thread #0: notifyNewLemma: " << lemma
64 << std::endl;
65 }
66 } catch(expr::pickle::PicklingException& p){
67 Trace("sharing::blocked") << lemma << std::endl;
68 }
69 }
70
71 };/* class PortfolioLemmaOutputChannel */
72
73 class PortfolioLemmaInputChannel : public LemmaInputChannel {
74 private:
75 std::string d_tag;
76 SharedChannel<ChannelFormat>* d_sharedChannel;
77 expr::pickle::MapPickler d_pickler;
78
79 public:
80 PortfolioLemmaInputChannel(std::string tag,
81 SharedChannel<ChannelFormat>* c,
82 ExprManager* em,
83 VarMap& to,
84 VarMap& from) :
85 d_tag(tag),
86 d_sharedChannel(c),
87 d_pickler(em, to, from){
88 }
89
90 ~PortfolioLemmaInputChannel() throw() { }
91
92 bool hasNewLemma(){
93 Debug("lemmaInputChannel") << d_tag << ": " << "hasNewLemma" << std::endl;
94 return !d_sharedChannel->empty();
95 }
96
97 Expr getNewLemma() {
98 Debug("lemmaInputChannel") << d_tag << ": " << "getNewLemma" << std::endl;
99 expr::pickle::Pickle pkl = d_sharedChannel->pop();
100
101 Expr e = d_pickler.fromPickle(pkl);
102 if(Trace.isOn("showSharing") && options::thread_id() == 0) {
103 *options::out() << "thread #0: getNewLemma: " << e << std::endl;
104 }
105 return e;
106 }
107
108 };/* class PortfolioLemmaInputChannel */
109
110 std::vector<Options> parseThreadSpecificOptions(Options opts);
111
112 template<typename T>
113 void sharingManager(unsigned numThreads,
114 SharedChannel<T> *channelsOut[], // out and in with respect
115 SharedChannel<T> *channelsIn[],
116 SmtEngine *smts[]) // to smt engines
117 {
118 Trace("sharing") << "sharing: thread started " << std::endl;
119 std::vector <int> cnt(numThreads); // Debug("sharing")
120
121 std::vector< std::queue<T> > queues;
122 for(unsigned i = 0; i < numThreads; ++i){
123 queues.push_back(std::queue<T>());
124 }
125
126 const unsigned int sharingBroadcastInterval = 1;
127
128 boost::mutex mutex_activity;
129
130 /* Disable interruption, so that we can check manually */
131 boost::this_thread::disable_interruption di;
132
133 while(not boost::this_thread::interruption_requested()) {
134
135 boost::this_thread::sleep
136 (boost::posix_time::milliseconds(sharingBroadcastInterval));
137
138 for(unsigned t = 0; t < numThreads; ++t) {
139
140 /* No activity on this channel */
141 if(channelsOut[t]->empty()) continue;
142
143 /* Alert if channel full, so that we increase sharingChannelSize
144 or decrease sharingBroadcastInterval */
145 assert(not channelsOut[t]->full());
146
147 T data = channelsOut[t]->pop();
148
149 if(Trace.isOn("sharing")) {
150 ++cnt[t];
151 Trace("sharing") << "sharing: Got data. Thread #" << t
152 << ". Chunk " << cnt[t] << std::endl;
153 }
154
155 for(unsigned u = 0; u < numThreads; ++u) {
156 if(u != t){
157 Trace("sharing") << "sharing: adding to queue " << u << std::endl;
158 queues[u].push(data);
159 }
160 }/* end of inner for: broadcast activity */
161
162 } /* end of outer for: look for activity */
163
164 for(unsigned t = 0; t < numThreads; ++t){
165 /* Alert if channel full, so that we increase sharingChannelSize
166 or decrease sharingBroadcastInterval */
167 assert(not channelsIn[t]->full());
168
169 while(!queues[t].empty() && !channelsIn[t]->full()){
170 Trace("sharing") << "sharing: pushing on channel " << t << std::endl;
171 T data = queues[t].front();
172 channelsIn[t]->push(data);
173 queues[t].pop();
174 }
175 }
176 } /* end of infinite while */
177
178 Trace("interrupt")
179 << "sharing thread interrupted, interrupting all smtEngines" << std::endl;
180
181 for(unsigned t = 0; t < numThreads; ++t) {
182 Trace("interrupt") << "Interrupting thread #" << t << std::endl;
183 try{
184 smts[t]->interrupt();
185 }catch(ModalException &e){
186 // It's fine, the thread is probably not there.
187 Trace("interrupt") << "Could not interrupt thread #" << t << std::endl;
188 }
189 }
190
191 Trace("sharing") << "sharing: Interrupted, exiting." << std::endl;
192 }/* sharingManager() */
193
194 }/* CVC4 namespace */
195
196 #endif /* __CVC4__PORTFOLIO_UTIL_H */