6f6e2f03b0f26e7955c8f6f1c955836bb5d2ac5b
[cvc5.git] / src / main / portfolio_util.h
1 /********************* */
2 /*! \file portfolio_util.h
3 ** \verbatim
4 ** Original author: Kshitij Bansal
5 ** Major contributors: none
6 ** Minor contributors (to current version): Morgan Deters
7 ** This file is part of the CVC4 project.
8 ** Copyright (c) 2009-2013 New York University and The University of Iowa
9 ** See the file COPYING in the top-level source directory for licensing
10 ** information.\endverbatim
11 **
12 ** \brief Code relevant only for portfolio builds
13 **/
14
15 #ifndef __CVC4__PORTFOLIO_UTIL_H
16 #define __CVC4__PORTFOLIO_UTIL_H
17
18 #include <queue>
19
20 #include "expr/pickler.h"
21 #include "util/channel.h"
22 #include "util/lemma_input_channel.h"
23 #include "util/lemma_output_channel.h"
24 #include "main/options.h"
25
26 namespace CVC4 {
27
28 typedef expr::pickle::Pickle ChannelFormat;
29
30 class PortfolioLemmaOutputChannel : public LemmaOutputChannel {
31 private:
32 std::string d_tag;
33 SharedChannel<ChannelFormat>* d_sharedChannel;
34 expr::pickle::MapPickler d_pickler;
35
36 public:
37 int cnt;
38 PortfolioLemmaOutputChannel(std::string tag,
39 SharedChannel<ChannelFormat> *c,
40 ExprManager* em,
41 VarMap& to,
42 VarMap& from) :
43 d_tag(tag),
44 d_sharedChannel(c),
45 d_pickler(em, to, from),
46 cnt(0)
47 {}
48
49 ~PortfolioLemmaOutputChannel() throw() { }
50
51 void notifyNewLemma(Expr lemma) {
52 if(int(lemma.getNumChildren()) > options::sharingFilterByLength()) {
53 return;
54 }
55 ++cnt;
56 Trace("sharing") << d_tag << ": " << lemma << std::endl;
57 expr::pickle::Pickle pkl;
58 try {
59 d_pickler.toPickle(lemma, pkl);
60 d_sharedChannel->push(pkl);
61 if(Trace.isOn("showSharing") && options::thread_id() == 0) {
62 *options::out() << "thread #0: notifyNewLemma: " << lemma
63 << std::endl;
64 }
65 } catch(expr::pickle::PicklingException& p){
66 Trace("sharing::blocked") << lemma << std::endl;
67 }
68 }
69
70 };/* class PortfolioLemmaOutputChannel */
71
72 class PortfolioLemmaInputChannel : public LemmaInputChannel {
73 private:
74 std::string d_tag;
75 SharedChannel<ChannelFormat>* d_sharedChannel;
76 expr::pickle::MapPickler d_pickler;
77
78 public:
79 PortfolioLemmaInputChannel(std::string tag,
80 SharedChannel<ChannelFormat>* c,
81 ExprManager* em,
82 VarMap& to,
83 VarMap& from) :
84 d_tag(tag),
85 d_sharedChannel(c),
86 d_pickler(em, to, from){
87 }
88
89 ~PortfolioLemmaInputChannel() throw() { }
90
91 bool hasNewLemma(){
92 Debug("lemmaInputChannel") << d_tag << ": " << "hasNewLemma" << std::endl;
93 return !d_sharedChannel->empty();
94 }
95
96 Expr getNewLemma() {
97 Debug("lemmaInputChannel") << d_tag << ": " << "getNewLemma" << std::endl;
98 expr::pickle::Pickle pkl = d_sharedChannel->pop();
99
100 Expr e = d_pickler.fromPickle(pkl);
101 if(Trace.isOn("showSharing") && options::thread_id() == 0) {
102 *options::out() << "thread #0: getNewLemma: " << e << std::endl;
103 }
104 return e;
105 }
106
107 };/* class PortfolioLemmaInputChannel */
108
109 std::vector<Options> parseThreadSpecificOptions(Options opts);
110
111 template<typename T>
112 void sharingManager(unsigned numThreads,
113 SharedChannel<T> *channelsOut[], // out and in with respect
114 SharedChannel<T> *channelsIn[],
115 SmtEngine *smts[]) // to smt engines
116 {
117 Trace("sharing") << "sharing: thread started " << std::endl;
118 std::vector <int> cnt(numThreads); // Debug("sharing")
119
120 std::vector< std::queue<T> > queues;
121 for(unsigned i = 0; i < numThreads; ++i){
122 queues.push_back(std::queue<T>());
123 }
124
125 const unsigned int sharingBroadcastInterval = 1;
126
127 boost::mutex mutex_activity;
128
129 /* Disable interruption, so that we can check manually */
130 boost::this_thread::disable_interruption di;
131
132 while(not boost::this_thread::interruption_requested()) {
133
134 boost::this_thread::sleep
135 (boost::posix_time::milliseconds(sharingBroadcastInterval));
136
137 for(unsigned t = 0; t < numThreads; ++t) {
138
139 /* No activity on this channel */
140 if(channelsOut[t]->empty()) continue;
141
142 /* Alert if channel full, so that we increase sharingChannelSize
143 or decrease sharingBroadcastInterval */
144 assert(not channelsOut[t]->full());
145
146 T data = channelsOut[t]->pop();
147
148 if(Trace.isOn("sharing")) {
149 ++cnt[t];
150 Trace("sharing") << "sharing: Got data. Thread #" << t
151 << ". Chunk " << cnt[t] << std::endl;
152 }
153
154 for(unsigned u = 0; u < numThreads; ++u) {
155 if(u != t){
156 Trace("sharing") << "sharing: adding to queue " << u << std::endl;
157 queues[u].push(data);
158 }
159 }/* end of inner for: broadcast activity */
160
161 } /* end of outer for: look for activity */
162
163 for(unsigned t = 0; t < numThreads; ++t){
164 /* Alert if channel full, so that we increase sharingChannelSize
165 or decrease sharingBroadcastInterval */
166 assert(not channelsIn[t]->full());
167
168 while(!queues[t].empty() && !channelsIn[t]->full()){
169 Trace("sharing") << "sharing: pushing on channel " << t << std::endl;
170 T data = queues[t].front();
171 channelsIn[t]->push(data);
172 queues[t].pop();
173 }
174 }
175 } /* end of infinite while */
176
177 Trace("interrupt")
178 << "sharing thread interrupted, interrupting all smtEngines" << std::endl;
179
180 for(unsigned t = 0; t < numThreads; ++t) {
181 Trace("interrupt") << "Interrupting thread #" << t << std::endl;
182 try{
183 smts[t]->interrupt();
184 }catch(ModalException &e){
185 // It's fine, the thread is probably not there.
186 Trace("interrupt") << "Could not interrupt thread #" << t << std::endl;
187 }
188 }
189
190 Trace("sharing") << "sharing: Interrupted, exiting." << std::endl;
191 }/* sharingManager() */
192
193 }/* CVC4 namespace */
194
195 #endif /* __CVC4__PORTFOLIO_UTIL_H */