1 /********************* */
2 /*! \file portfolio_util.h
4 ** Original author: Kshitij Bansal
5 ** Major contributors: none
6 ** Minor contributors (to current version): Morgan Deters
7 ** This file is part of the CVC4 project.
8 ** Copyright (c) 2009-2013 New York University and The University of Iowa
9 ** See the file COPYING in the top-level source directory for licensing
10 ** information.\endverbatim
12 ** \brief Code relevant only for portfolio builds
15 #ifndef __CVC4__PORTFOLIO_UTIL_H
16 #define __CVC4__PORTFOLIO_UTIL_H
20 #include "expr/pickler.h"
21 #include "util/channel.h"
22 #include "util/lemma_input_channel.h"
23 #include "util/lemma_output_channel.h"
24 #include "main/options.h"
28 typedef expr::pickle::Pickle ChannelFormat
;
30 class PortfolioLemmaOutputChannel
: public LemmaOutputChannel
{
33 SharedChannel
<ChannelFormat
>* d_sharedChannel
;
34 expr::pickle::MapPickler d_pickler
;
38 PortfolioLemmaOutputChannel(std::string tag
,
39 SharedChannel
<ChannelFormat
> *c
,
45 d_pickler(em
, to
, from
),
49 ~PortfolioLemmaOutputChannel() throw() { }
51 void notifyNewLemma(Expr lemma
) {
52 if(int(lemma
.getNumChildren()) > options::sharingFilterByLength()) {
56 Trace("sharing") << d_tag
<< ": " << lemma
<< std::endl
;
57 expr::pickle::Pickle pkl
;
59 d_pickler
.toPickle(lemma
, pkl
);
60 d_sharedChannel
->push(pkl
);
61 if(Trace
.isOn("showSharing") && options::thread_id() == 0) {
62 *options::out() << "thread #0: notifyNewLemma: " << lemma
65 } catch(expr::pickle::PicklingException
& p
){
66 Trace("sharing::blocked") << lemma
<< std::endl
;
70 };/* class PortfolioLemmaOutputChannel */
72 class PortfolioLemmaInputChannel
: public LemmaInputChannel
{
75 SharedChannel
<ChannelFormat
>* d_sharedChannel
;
76 expr::pickle::MapPickler d_pickler
;
79 PortfolioLemmaInputChannel(std::string tag
,
80 SharedChannel
<ChannelFormat
>* c
,
86 d_pickler(em
, to
, from
){
89 ~PortfolioLemmaInputChannel() throw() { }
92 Debug("lemmaInputChannel") << d_tag
<< ": " << "hasNewLemma" << std::endl
;
93 return !d_sharedChannel
->empty();
97 Debug("lemmaInputChannel") << d_tag
<< ": " << "getNewLemma" << std::endl
;
98 expr::pickle::Pickle pkl
= d_sharedChannel
->pop();
100 Expr e
= d_pickler
.fromPickle(pkl
);
101 if(Trace
.isOn("showSharing") && options::thread_id() == 0) {
102 *options::out() << "thread #0: getNewLemma: " << e
<< std::endl
;
107 };/* class PortfolioLemmaInputChannel */
109 std::vector
<Options
> parseThreadSpecificOptions(Options opts
);
112 void sharingManager(unsigned numThreads
,
113 SharedChannel
<T
> *channelsOut
[], // out and in with respect
114 SharedChannel
<T
> *channelsIn
[],
115 SmtEngine
*smts
[]) // to smt engines
117 Trace("sharing") << "sharing: thread started " << std::endl
;
118 std::vector
<int> cnt(numThreads
); // Debug("sharing")
120 std::vector
< std::queue
<T
> > queues
;
121 for(unsigned i
= 0; i
< numThreads
; ++i
){
122 queues
.push_back(std::queue
<T
>());
125 const unsigned int sharingBroadcastInterval
= 1;
127 boost::mutex mutex_activity
;
129 /* Disable interruption, so that we can check manually */
130 boost::this_thread::disable_interruption di
;
132 while(not boost::this_thread::interruption_requested()) {
134 boost::this_thread::sleep
135 (boost::posix_time::milliseconds(sharingBroadcastInterval
));
137 for(unsigned t
= 0; t
< numThreads
; ++t
) {
139 /* No activity on this channel */
140 if(channelsOut
[t
]->empty()) continue;
142 /* Alert if channel full, so that we increase sharingChannelSize
143 or decrease sharingBroadcastInterval */
144 assert(not channelsOut
[t
]->full());
146 T data
= channelsOut
[t
]->pop();
148 if(Trace
.isOn("sharing")) {
150 Trace("sharing") << "sharing: Got data. Thread #" << t
151 << ". Chunk " << cnt
[t
] << std::endl
;
154 for(unsigned u
= 0; u
< numThreads
; ++u
) {
156 Trace("sharing") << "sharing: adding to queue " << u
<< std::endl
;
157 queues
[u
].push(data
);
159 }/* end of inner for: broadcast activity */
161 } /* end of outer for: look for activity */
163 for(unsigned t
= 0; t
< numThreads
; ++t
){
164 /* Alert if channel full, so that we increase sharingChannelSize
165 or decrease sharingBroadcastInterval */
166 assert(not channelsIn
[t
]->full());
168 while(!queues
[t
].empty() && !channelsIn
[t
]->full()){
169 Trace("sharing") << "sharing: pushing on channel " << t
<< std::endl
;
170 T data
= queues
[t
].front();
171 channelsIn
[t
]->push(data
);
175 } /* end of infinite while */
178 << "sharing thread interrupted, interrupting all smtEngines" << std::endl
;
180 for(unsigned t
= 0; t
< numThreads
; ++t
) {
181 Trace("interrupt") << "Interrupting thread #" << t
<< std::endl
;
183 smts
[t
]->interrupt();
184 }catch(ModalException
&e
){
185 // It's fine, the thread is probably not there.
186 Trace("interrupt") << "Could not interrupt thread #" << t
<< std::endl
;
190 Trace("sharing") << "sharing: Interrupted, exiting." << std::endl
;
191 }/* sharingManager() */
193 }/* CVC4 namespace */
195 #endif /* __CVC4__PORTFOLIO_UTIL_H */