OSSIM - Open Source Software Image Map  Version 1.9.0 (20180803)
ossimMultiThreadSequencer.cpp
Go to the documentation of this file.
1 //**************************************************************************************************
2 // OSSIM -- Open Source Software Image Map
3 //
4 // LICENSE: See top level LICENSE.txt file.
5 //
6 // AUTHOR: Oscar Kramer
7 //
11 //
12 //**************************************************************************************************
13 // $Id$
14 
17 #include <ossim/base/ossimIrect.h>
18 #include <ossim/base/ossimTimer.h>
19 static const ossim_uint32 DEFAULT_MAX_TILE_CACHE_FACTOR = 8; // Must be > 1
20 
22 
23 //*************************************************************************************************
24 // Job's start method performs actual getTile in a thread on cloned chain and saves the result
25 // in the sequencer's results cache.
26 //*************************************************************************************************
28 {
29  running();
31  {
32  ostringstream s1;
33  s1<<"THREAD #"<<m_chainID<<" -- Starting tile/job #"<<m_tileID;
34  m_sequencer.print(s1);
35  }
36 
37  // Figure out the rect for this tile. Only process if rect is valid:
38  ossimIrect tileRect;
39  if (m_sequencer.getTileRect(m_tileID, tileRect))
40  {
41  // Perform the getTile and save the result:
44  double dt = ossimTimer::instance()->time_s(); //###
45 
46  if (source != NULL)
47  tile = source->getTile(tileRect);
48  if (!tile.valid())
49  {
51  tile->setImageRectangle(tileRect);
52  }
53  dt = ossimTimer::instance()->time_s() - dt; //###
54 
55  // Give the sequencer the tile. Execution may pause here while waiting for space to free up
56  // if the cache is full.
58  }
59 
60  // Unblock the main thread which might be blocked waiting for jobs to finish:
62 
63  // Queue the next job using this job's freed-up image chain:
64  if (t_launchNewJob)
66 
67  finished();
68 
70  {
71  ostringstream s2;
72  s2<<"THREAD #"<<m_chainID<<" -- Finished tile/job #"<<m_tileID;
73  m_sequencer.print(s2);
74  }
75 }
76 
77 //*************************************************************************************************
78 // Constructor
79 //*************************************************************************************************
81  ossim_uint32 num_threads,
82  ossimObject* owner)
83  : ossimImageSourceSequencer(input, owner),
84  d_maxCacheUsed(0),
86  d_idleTime1(0.0),
87  d_idleTime2(0.0),
88  d_idleTime3(0.0),
89  d_idleTime4(0.0),
90  d_idleTime5(0.0),
91  d_idleTime6(0.0),
92  d_jobGetTileT(0.0),
93  m_inputChain(0),
94  m_jobMtQueue(0),
95  m_numThreads (num_threads),
96  m_callback(std::make_shared<ossimGetTileCallback>()),
97  m_nextTileID (0),
98  m_tileCache(),
99  m_maxCacheSize (DEFAULT_MAX_TILE_CACHE_FACTOR * num_threads),
100  m_maxTileCacheFactor (DEFAULT_MAX_TILE_CACHE_FACTOR),
101  m_cacheMutex(),
102  m_jobMutex(),
104  m_getTileBlock(),
105  m_nextJobBlock(),
106  d_printMutex(),
107  d_timerMutex(),
108  d_debugEnabled(false),
109  d_timedBlocksDt(0),
110  d_timeMetricsEnabled(false),
112  d_cacheTileSize(1024),
113  d_useCache(true),
114  d_t1(0.0)
115 {
116  //###### DEBUG ############
117  ossimMtDebug* mt_debug = ossimMtDebug::instance();
118  if (mt_debug->maxTileCacheSize != 0)
119  m_maxCacheSize = mt_debug->maxTileCacheSize;
120  d_debugEnabled = mt_debug->seqDebugEnabled;
121  d_timedBlocksDt = mt_debug->seqTimedBlocksDt;
123  //###### END DEBUG ############
124 
125  // The base-class' initialize() method should have been called by the base class constructor
126  // unless somebody moved it!
130 }
131 
132 //*************************************************************************************************
133 // Destructor
134 //*************************************************************************************************
136 {
137  m_inputChain = 0;
138  m_jobMtQueue = 0;
139  m_callback.reset();
140 }
141 
142 //*************************************************************************************************
144 //*************************************************************************************************
146 {
147  // Reset important indices:
149  m_nextTileID = 0;
151 
153  if (theInputConnection == NULL)
154  return;
155 
156  // Check if this param was already set externally. Query the system capability if not:
157  if (m_numThreads == 0)
158  {
161  }
162 
163  // Adapt the input source to be an ossimImageChainMtAdaptor since we can only work
164  // with this type:
166  if (m_inputChain.valid())
167  {
172  }
173  else
174  {
175  // Need to adapt input. First, is it a chain?
176  ossimImageChain* chain = dynamic_cast<ossimImageChain*>(theInputConnection);
177  if (chain == NULL)
178  {
179  // The input is just a common image source. Make it a chain:
180  chain = new ossimImageChain;
181  chain->add(theInputConnection);
182  }
183 
184  // This instantiation creates a set of cloned image chains, one per thread, that will be
185  // accessed in parallel for the getTile() operation:
187  }
188 
189  // Set the output of the chain to be this sequencer:
191  //connectMyInputTo(m_inputChain.get());
192  //setAreaOfInterest(m_inputChain->getBoundingRect());
193 
195  for (ossim_uint32 i=0; i<m_numThreads; ++i)
196  {
197  std::shared_ptr<ossimGetTileJob> job = std::make_shared<ossimGetTileJob>(m_nextTileID++, i, *this);
198  job->setCallback(m_callback);
199  job->t_launchNewJob = false;
200  job->start();
201  }
202 
203  // Set up the job queue and fill it with first N jobs:
204  ossim_uint32 num_jobs_to_launch = min<ossim_uint32>(m_numThreads, m_totalNumberOfTiles);
205  std::shared_ptr<ossimJobQueue> jobQueue = std::make_shared<ossimJobQueue>();
206  for (ossim_uint32 chain_id=0; chain_id<num_jobs_to_launch; ++chain_id)
207  {
208  if (d_debugEnabled)
209  {
210  ostringstream s;
211  s<<"setToStartOfSequence() -- Creating tile/job #"<<m_nextTileID;
212  print(s);
213  }
214 
215  std::shared_ptr<ossimGetTileJob> job = std::make_shared<ossimGetTileJob>(m_nextTileID++, chain_id, *this);
216  job->setCallback(m_callback);
217  jobQueue->add(job, false);
218  }
219 
220  // Initialize the multi-thread queue. Note the setJobQueue is done after construction as it was
221  // crashing do to jobs being launched during init:
222  m_jobMtQueue = std::make_shared<ossimJobMultiThreadQueue>(nullptr, num_jobs_to_launch);
223  m_jobMtQueue->setJobQueue(jobQueue);
224 }
225 
226 
227 //*************************************************************************************************
230 //*************************************************************************************************
232 {
233  if (!m_inputChain.valid())
234  return NULL;
235 
236  // May need to initiate the threaded sequencing if not already done:
237  if (m_nextTileID == 0)
239 
240  // Terminate with null return if done:
243  {
244  return tile;
245  }
246 
247  // May need to wait until the corresponding job is finished if the tile is not in the cache:
248  TileCache::iterator tile_iter = m_tileCache.begin();
249  while (!tile.valid())
250  {
251  // If the tile is not yet copied into the cache, it means the job is still running. Let's
252  // block this thread and let the getTile jobs unlock as they finish. We'll exit this loop
253  // when the job of interest finishes.
256  m_cacheMutex.lock();
259 
260  // RP - Just grab the first tile for better performance, because order does not matter, we need
261  // to process them all
262  tile_iter = m_tileCache.begin(); //.find(theCurrentTileNumber);
263  m_cacheMutex.unlock();
264 
265  if (tile_iter == m_tileCache.end())
266  {
267  if (d_debugEnabled)
268  {
269  ostringstream s1;
270  s1<<"getNextTile() -- Waiting on tile #"<<theCurrentTileNumber;
271  m_cacheMutex.lock();
272  s1<<"\n cache size = "<<m_tileCache.size();
273  TileCache::iterator iter = m_tileCache.begin();
274  while(iter != m_tileCache.end())
275  {
276  s1<<"\n cache.tile_id = "<<iter->first;
277  iter++;
278  }
279  m_cacheMutex.unlock();
280  print(s1);
281  }
282 
283  if (d_timedBlocksDt > 0)
285  else
286  {
293  }
294  }
295  else
296  {
297  // A valid tile was found. Need to assign the output tile and free up the reference in the
298  // cache:
299  if (d_debugEnabled)
300  {
301  ostringstream s2;
302  s2<<"getNextTile() -- Copying tile #"<<theCurrentTileNumber<<". Cache size: "<<m_tileCache.size();
303  print(s2);
304  }
305  tile = tile_iter->second;
306 
309  m_cacheMutex.lock();
310  m_tileCache.erase(tile_iter);
311  m_cacheMutex.unlock();
314 
315  if (m_tileCache.empty())
317  m_nextJobBlock.release(); // nextJob() may be blocked until cache space is freed
318  }
319  }
320 
321  // Advance the caller-requested tile ID. This is different from the last threaded getTile()'s
322  // tile index maintained in m_nextTileID and advanced in initNextJob():
324  return tile;
325 }
326 
327 //*************************************************************************************************
328 // Specifies number of thread to support. Default behavior (if this method is never called) is
329 // query the system for number of cores available.
330 //*************************************************************************************************
332 {
333  m_numThreads = num_threads;
335 
336  if (m_inputChain.valid())
337  m_inputChain->setNumberOfThreads(num_threads);
338 
339  if (m_jobMtQueue && m_jobMtQueue->hasJobsToProcess())
340  m_jobMtQueue->getJobQueue()->clear();
341 
342  m_nextTileID = 0; // effectively resets this sequencer
343 }
344 
346 {
347  d_useSharedHandlers = use_shared_handlers;
348 
349  if (m_inputChain.valid())
350  m_inputChain->setUseSharedHandlers(use_shared_handlers);
351 }
352 
354 {
355  d_cacheTileSize = cache_tile_size;
356 
357  if (m_inputChain.valid())
358  m_inputChain->setCacheTileSize(cache_tile_size);
359 }
360 
362 {
363  d_useCache = use_cache;
364 
365  if (m_inputChain.valid())
366  m_inputChain->setUseCache(use_cache);
367 }
368 
369 //*************************************************************************************************
372 //*************************************************************************************************
374  ossimImageData* tile,
375  ossim_uint32 chain_id,
376  double dt)
377 {
380  std::lock_guard<std::mutex> lock(m_cacheMutex);
383 
384  d_jobGetTileT += dt;
385 
386  m_tileCache[tile_id] = tile;
387  if (d_debugEnabled)
388  {
389  ostringstream s2;
390  s2<<"THREAD #"<<chain_id<<" -- setTileInCache() Wrote tile #"<<tile_id;
391  print(s2);
392  }
393  if (d_maxCacheUsed < m_tileCache.size())
395 }
396 
397 //*************************************************************************************************
398 // Queues up the next getTile job if cache is not full. This is called as soon as the job
399 // handling the corresponding chain ID is finished.
400 //*************************************************************************************************
402 {
403  // Check for end of sequence:
405  return;
406 
407  while (((ossim_uint32) m_tileCache.size()) >= m_maxCacheSize)
408  {
409  if (d_debugEnabled)
410  {
411  m_cacheMutex.lock();
412  TileCache::const_iterator iter = m_tileCache.begin();
413  ostringstream s1;
414  s1<<"THREAD #"<<chain_id<<" -- nextJob() Waiting on cache before queuing tile/job #"
415  <<m_nextTileID<<"using chain #"<<chain_id<<". Cache size: "<<m_tileCache.size();
416  while(iter != m_tileCache.end())
417  {
418  s1<<"\n cache.tile_id = "<<iter->first;
419  iter++;
420  }
421  m_cacheMutex.unlock();
422  print(s1);
423  }
424 
425  if (d_timedBlocksDt > 0)
427  else
428  {
435  }
436  }
437 
438  if (d_debugEnabled)
439  {
440  ostringstream s2;
441  s2<<"THREAD #"<<chain_id<<" -- nextJob() Queuing tile/job #"<<m_nextTileID;
442  print(s2);
443  }
444 
445  // Job queue will receive pointer into ossimRefPtr so no leak here:
448  std::lock_guard<std::mutex> lock(m_jobMutex);
451 
452  std::shared_ptr<ossimGetTileJob> job = std::make_shared<ossimGetTileJob>(m_nextTileID++, chain_id, *this);
453  job->setCallback(m_callback);
454  m_jobMtQueue->getJobQueue()->add(job);
455 }
456 
457 //*************************************************************************************************
458 // For Debugging
459 //*************************************************************************************************
461 {
462  std::lock_guard<std::mutex> lock(d_printMutex);
463  cerr << msg.str() << endl;
464 }
466 {
467  if (m_inputChain->m_sharedHandlers.empty())
468  return -1.0;
470  if (ha.valid())
471  return ha->d_getTileT;
472  return -1;
473 }
474 
475 bool ossimMultiThreadSequencer::loadState(const ossimKeywordlist& kwl, const char* prefix)
476 {
477  const char* lookup;
478  lookup = kwl.find(prefix, "num_threads");
479  if(lookup)
480  {
481  ossim_uint32 num_threads = ossimString(lookup).toUInt32();
482  setNumberOfThreads(num_threads);
483  }
484  lookup = kwl.find(prefix, "use_shared_handlers");
485  if(lookup)
486  {
487  bool use_shared_handlers = ossimString(lookup).toBool();
488  setUseSharedHandlers(use_shared_handlers);
489  }
490  lookup = kwl.find(prefix, "cache_tile_size");
491  if(lookup)
492  {
493  ossim_uint32 cache_tile_size = ossimString(lookup).toUInt32();
494  setCacheTileSize(cache_tile_size);
495  }
496  lookup = kwl.find(prefix, "use_cache");
497 
498  if(lookup)
499  {
500  bool use_cache = ossimString(lookup).toBool();
501  setUseCache(use_cache);
502  }
503 
504  bool status = ossimImageSourceSequencer::loadState(kwl, prefix);
505 
506  return status;
507 }
void nextJob(ossim_uint32 chain_id)
Method to job queue with scope lock to avoid multiple threads modifying the queue simultaneously...
ossim_uint32 m_nextTileID
ID of next tile to be threaded, different from base class&#39; theCurrentTileNumber.
virtual void disconnectAllOutputs()
Will disconnect all of the output objects.
void setNumberOfThreads(ossim_uint32 num_threads)
Specifies number of thread to support.
std::basic_ostringstream< char > ostringstream
Class for char output memory streams.
Definition: ossimIosFwd.h:35
virtual ossimRefPtr< ossimImageData > getNextTile(ossim_uint32 resLevel=0)
Overrides base class in order to implement multi-threaded tile requests.
void setUseSharedHandlers(bool use_shared_handlers)
virtual void setImageRectangle(const ossimIrect &rect)
void setStartTick()
Set the start.
Definition: ossimTimer.h:27
Represents serializable keyword/value map.
virtual void setToStartOfSequence()
Overrides base class implementation.
bool valid() const
Definition: ossimRefPtr.h:75
const char * find(const char *key) const
std::shared_ptr< ossimGetTileCallback > m_callback
void setTileInCache(ossim_uint32 tile_id, ossimImageData *tile, ossim_uint32 chain_id, double dt)
Access method to tile cache with scope lock to avoid multiple threads writing to the cache simultaneo...
bool getTileRect(ossim_int64 tile_id, ossimIrect &rect) const
Establishes a tile rect given tile ID.
ossim_uint32 seqTimedBlocksDt
Definition: ossimMtDebug.h:36
ossim_uint32 toUInt32() const
void print(ostringstream &msg) const
For debug – thread-safe console output.
void setNumberOfThreads(ossim_uint32 num_threads)
Alternate way of specifying number of threads to support.
ossimRefPtr< ossimImageChainMtAdaptor > m_inputChain
Same as base class&#39; theInputConnection.
virtual ossimObject * dup() const
OSSIM_DLL ossim_uint32 getNumberOfThreads()
Get the number threads to use from ossimPreferences or ossim::Thread.
bool seqDebugEnabled
Definition: ossimMtDebug.h:35
ossimMultiThreadSequencer(ossimImageSource *inputSource=NULL, ossim_uint32 num_threads=0, ossimObject *owner=NULL)
void release()
Releases the threads and will not return until all threads are released.
Definition: Block.cpp:66
static ossimTimer * instance()
Definition: ossimTimer.cpp:19
void setUseSharedHandlers(bool use_shared_handlers)
Private class for getTile job callbacks.
ossim_uint32 maxTileCacheSize
Definition: ossimMtDebug.h:38
bool toBool() const
String to numeric methods.
double time_s() const
Get elapsed time in seconds.
Definition: ossimTimer.h:33
unsigned int ossim_uint32
bool seqMetricsEnabled
Definition: ossimMtDebug.h:37
virtual bool add(ossimConnectableObject *source)
Will return true or false if an image source was added to the chain.
std::shared_ptr< ossimJobMultiThreadQueue > m_jobMtQueue
bool loadState(const ossimKeywordlist &kwl, const char *prefix)
Method to the load (recreate) the state of an object from a keyword list.
void block()
Will block the calling thread based on the internal condition.
Definition: Block.cpp:33
TileCache m_tileCache
Saves tiles output by threaded jobs.
virtual void finished()
Sets the state if the object as finished.
Definition: ossimJob.h:316
For debugging purposes. To be removed with final release:
Definition: ossimMtDebug.h:7
This class supports multi-threading of image chain getTile() requests and associated chain updating...
return status
bool loadState(const ossimKeywordlist &kwl, const char *prefix)
Method to the load (recreate) the state of an object from a keyword list.
static ossimMtDebug * m_instance
Definition: ossimMtDebug.h:41
virtual void run()
Abstract method and must be overriden by the base class.
ossimImageSource * getClone(ossim_uint32 index)
Returns pointer to a specific clone image chain, or NULL if index exceeds the max available...
ossimRefPtr< ossimImageData > theBlankTile
void reset()
Simple reset the values.
Definition: Block.cpp:78
void setCacheTileSize(ossim_uint32 cache_tile_size)
virtual void running()
Sets the state if the object as running.
Definition: ossimJob.h:308
static ossimMtDebug * instance()
Definition: ossimMtDebug.h:24
void setCacheTileSize(ossim_uint32 cache_tile_size)
virtual ossimRefPtr< ossimImageData > getTile(const ossimIpt &origin, ossim_uint32 resLevel=0)