1 /********************* */
2 /*! \file portfolio_util.h
4 ** Original author: Kshitij Bansal
5 ** Major contributors: none
6 ** Minor contributors (to current version): Morgan Deters
7 ** This file is part of the CVC4 project.
8 ** Copyright (c) 2009-2013 New York University and The University of Iowa
9 ** See the file COPYING in the top-level source directory for licensing
10 ** information.\endverbatim
12 ** \brief Code relevant only for portfolio builds
15 #ifndef __CVC4__PORTFOLIO_UTIL_H
16 #define __CVC4__PORTFOLIO_UTIL_H
20 #include "expr/pickler.h"
21 #include "util/channel.h"
22 #include "util/lemma_input_channel.h"
23 #include "util/lemma_output_channel.h"
24 #include "util/output.h"
25 #include "main/options.h"
29 typedef expr::pickle::Pickle ChannelFormat
;
31 class PortfolioLemmaOutputChannel
: public LemmaOutputChannel
{
34 SharedChannel
<ChannelFormat
>* d_sharedChannel
;
35 expr::pickle::MapPickler d_pickler
;
39 PortfolioLemmaOutputChannel(std::string tag
,
40 SharedChannel
<ChannelFormat
> *c
,
46 d_pickler(em
, to
, from
),
50 ~PortfolioLemmaOutputChannel() throw() { }
52 void notifyNewLemma(Expr lemma
) {
53 if(int(lemma
.getNumChildren()) > options::sharingFilterByLength()) {
57 Trace("sharing") << d_tag
<< ": " << lemma
<< std::endl
;
58 expr::pickle::Pickle pkl
;
60 d_pickler
.toPickle(lemma
, pkl
);
61 d_sharedChannel
->push(pkl
);
62 if(Trace
.isOn("showSharing") && options::thread_id() == 0) {
63 *options::out() << "thread #0: notifyNewLemma: " << lemma
66 } catch(expr::pickle::PicklingException
& p
){
67 Trace("sharing::blocked") << lemma
<< std::endl
;
71 };/* class PortfolioLemmaOutputChannel */
73 class PortfolioLemmaInputChannel
: public LemmaInputChannel
{
76 SharedChannel
<ChannelFormat
>* d_sharedChannel
;
77 expr::pickle::MapPickler d_pickler
;
80 PortfolioLemmaInputChannel(std::string tag
,
81 SharedChannel
<ChannelFormat
>* c
,
87 d_pickler(em
, to
, from
){
90 ~PortfolioLemmaInputChannel() throw() { }
93 Debug("lemmaInputChannel") << d_tag
<< ": " << "hasNewLemma" << std::endl
;
94 return !d_sharedChannel
->empty();
98 Debug("lemmaInputChannel") << d_tag
<< ": " << "getNewLemma" << std::endl
;
99 expr::pickle::Pickle pkl
= d_sharedChannel
->pop();
101 Expr e
= d_pickler
.fromPickle(pkl
);
102 if(Trace
.isOn("showSharing") && options::thread_id() == 0) {
103 *options::out() << "thread #0: getNewLemma: " << e
<< std::endl
;
108 };/* class PortfolioLemmaInputChannel */
110 std::vector
<Options
> parseThreadSpecificOptions(Options opts
);
113 void sharingManager(unsigned numThreads
,
114 SharedChannel
<T
> *channelsOut
[], // out and in with respect
115 SharedChannel
<T
> *channelsIn
[],
116 SmtEngine
*smts
[]) // to smt engines
118 Trace("sharing") << "sharing: thread started " << std::endl
;
119 std::vector
<int> cnt(numThreads
); // Debug("sharing")
121 std::vector
< std::queue
<T
> > queues
;
122 for(unsigned i
= 0; i
< numThreads
; ++i
){
123 queues
.push_back(std::queue
<T
>());
126 const unsigned int sharingBroadcastInterval
= 1;
128 boost::mutex mutex_activity
;
130 /* Disable interruption, so that we can check manually */
131 boost::this_thread::disable_interruption di
;
133 while(not boost::this_thread::interruption_requested()) {
135 boost::this_thread::sleep
136 (boost::posix_time::milliseconds(sharingBroadcastInterval
));
138 for(unsigned t
= 0; t
< numThreads
; ++t
) {
140 /* No activity on this channel */
141 if(channelsOut
[t
]->empty()) continue;
143 /* Alert if channel full, so that we increase sharingChannelSize
144 or decrease sharingBroadcastInterval */
145 assert(not channelsOut
[t
]->full());
147 T data
= channelsOut
[t
]->pop();
149 if(Trace
.isOn("sharing")) {
151 Trace("sharing") << "sharing: Got data. Thread #" << t
152 << ". Chunk " << cnt
[t
] << std::endl
;
155 for(unsigned u
= 0; u
< numThreads
; ++u
) {
157 Trace("sharing") << "sharing: adding to queue " << u
<< std::endl
;
158 queues
[u
].push(data
);
160 }/* end of inner for: broadcast activity */
162 } /* end of outer for: look for activity */
164 for(unsigned t
= 0; t
< numThreads
; ++t
){
165 /* Alert if channel full, so that we increase sharingChannelSize
166 or decrease sharingBroadcastInterval */
167 assert(not channelsIn
[t
]->full());
169 while(!queues
[t
].empty() && !channelsIn
[t
]->full()){
170 Trace("sharing") << "sharing: pushing on channel " << t
<< std::endl
;
171 T data
= queues
[t
].front();
172 channelsIn
[t
]->push(data
);
176 } /* end of infinite while */
179 << "sharing thread interrupted, interrupting all smtEngines" << std::endl
;
181 for(unsigned t
= 0; t
< numThreads
; ++t
) {
182 Trace("interrupt") << "Interrupting thread #" << t
<< std::endl
;
184 smts
[t
]->interrupt();
185 }catch(ModalException
&e
){
186 // It's fine, the thread is probably not there.
187 Trace("interrupt") << "Could not interrupt thread #" << t
<< std::endl
;
191 Trace("sharing") << "sharing: Interrupted, exiting." << std::endl
;
192 }/* sharingManager() */
194 }/* CVC4 namespace */
196 #endif /* __CVC4__PORTFOLIO_UTIL_H */