1 /********************* */
2 /*! \file portfolio_util.h
4 ** Original author: kshitij
5 ** Major contributors: none
6 ** Minor contributors (to current version): mdeters
7 ** This file is part of the CVC4 prototype.
8 ** Copyright (c) 2009-2012 New York University and The University of Iowa
9 ** See the file COPYING in the top-level source directory for licensing
10 ** information.\endverbatim
12 ** \brief Code relevant only for portfolio builds
15 #ifndef __CVC4__PORTFOLIO_UTIL_H
16 #define __CVC4__PORTFOLIO_UTIL_H
20 #include "expr/pickler.h"
21 #include "util/channel.h"
22 #include "util/lemma_input_channel.h"
23 #include "util/lemma_output_channel.h"
24 #include "main/options.h"
28 typedef expr::pickle::Pickle ChannelFormat
;
30 class PortfolioLemmaOutputChannel
: public LemmaOutputChannel
{
33 SharedChannel
<ChannelFormat
>* d_sharedChannel
;
34 expr::pickle::MapPickler d_pickler
;
38 PortfolioLemmaOutputChannel(std::string tag
,
39 SharedChannel
<ChannelFormat
> *c
,
45 d_pickler(em
, to
, from
),
49 void notifyNewLemma(Expr lemma
) {
50 if(int(lemma
.getNumChildren()) > options::sharingFilterByLength()) {
54 Trace("sharing") << d_tag
<< ": " << lemma
<< std::endl
;
55 expr::pickle::Pickle pkl
;
57 d_pickler
.toPickle(lemma
, pkl
);
58 d_sharedChannel
->push(pkl
);
59 if(Trace
.isOn("showSharing") && options::thread_id() == 0) {
60 *options::out() << "thread #0: notifyNewLemma: " << lemma
63 } catch(expr::pickle::PicklingException
& p
){
64 Trace("sharing::blocked") << lemma
<< std::endl
;
68 };/* class PortfolioLemmaOutputChannel */
70 class PortfolioLemmaInputChannel
: public LemmaInputChannel
{
73 SharedChannel
<ChannelFormat
>* d_sharedChannel
;
74 expr::pickle::MapPickler d_pickler
;
77 PortfolioLemmaInputChannel(std::string tag
,
78 SharedChannel
<ChannelFormat
>* c
,
84 d_pickler(em
, to
, from
){
88 Debug("lemmaInputChannel") << d_tag
<< ": " << "hasNewLemma" << std::endl
;
89 return !d_sharedChannel
->empty();
93 Debug("lemmaInputChannel") << d_tag
<< ": " << "getNewLemma" << std::endl
;
94 expr::pickle::Pickle pkl
= d_sharedChannel
->pop();
96 Expr e
= d_pickler
.fromPickle(pkl
);
97 if(Trace
.isOn("showSharing") && options::thread_id() == 0) {
98 *options::out() << "thread #0: getNewLemma: " << e
<< std::endl
;
103 };/* class PortfolioLemmaInputChannel */
105 std::vector
<Options
> parseThreadSpecificOptions(Options opts
);
108 void sharingManager(unsigned numThreads
,
109 SharedChannel
<T
> *channelsOut
[], // out and in with respect
110 SharedChannel
<T
> *channelsIn
[],
111 SmtEngine
*smts
[]) // to smt engines
113 Trace("sharing") << "sharing: thread started " << std::endl
;
114 std::vector
<int> cnt(numThreads
); // Debug("sharing")
116 std::vector
< std::queue
<T
> > queues
;
117 for(unsigned i
= 0; i
< numThreads
; ++i
){
118 queues
.push_back(std::queue
<T
>());
121 const unsigned int sharingBroadcastInterval
= 1;
123 boost::mutex mutex_activity
;
125 /* Disable interruption, so that we can check manually */
126 boost::this_thread::disable_interruption di
;
128 while(not boost::this_thread::interruption_requested()) {
130 boost::this_thread::sleep
131 (boost::posix_time::milliseconds(sharingBroadcastInterval
));
133 for(unsigned t
= 0; t
< numThreads
; ++t
) {
135 /* No activity on this channel */
136 if(channelsOut
[t
]->empty()) continue;
138 /* Alert if channel full, so that we increase sharingChannelSize
139 or decrease sharingBroadcastInterval */
140 assert(not channelsOut
[t
]->full());
142 T data
= channelsOut
[t
]->pop();
144 if(Trace
.isOn("sharing")) {
146 Trace("sharing") << "sharing: Got data. Thread #" << t
147 << ". Chunk " << cnt
[t
] << std::endl
;
150 for(unsigned u
= 0; u
< numThreads
; ++u
) {
152 Trace("sharing") << "sharing: adding to queue " << u
<< std::endl
;
153 queues
[u
].push(data
);
155 }/* end of inner for: broadcast activity */
157 } /* end of outer for: look for activity */
159 for(unsigned t
= 0; t
< numThreads
; ++t
){
160 /* Alert if channel full, so that we increase sharingChannelSize
161 or decrease sharingBroadcastInterval */
162 assert(not channelsIn
[t
]->full());
164 while(!queues
[t
].empty() && !channelsIn
[t
]->full()){
165 Trace("sharing") << "sharing: pushing on channel " << t
<< std::endl
;
166 T data
= queues
[t
].front();
167 channelsIn
[t
]->push(data
);
171 } /* end of infinite while */
174 << "sharing thread interrupted, interrupting all smtEngines" << std::endl
;
176 for(unsigned t
= 0; t
< numThreads
; ++t
) {
177 Trace("interrupt") << "Interrupting thread #" << t
<< std::endl
;
179 smts
[t
]->interrupt();
180 }catch(ModalException
&e
){
181 // It's fine, the thread is probably not there.
182 Trace("interrupt") << "Could not interrupt thread #" << t
<< std::endl
;
186 Trace("sharing") << "sharing: Interrupted, exiting." << std::endl
;
187 }/* sharingManager() */
189 }/* CVC4 namespace */
191 #endif /* __CVC4__PORTFOLIO_UTIL_H */