OSSIM - Open Source Software Image Map  Version 1.9.0 (20180803)
ossimS3StreamBuffer.cpp
Go to the documentation of this file.
1 //---
2 //
3 // License: MIT
4 //
5 // Author: Garrett Potts
6 //
7 // Description:
8 //
9 // OSSIM Amazon Web Services (AWS) S3 streambuf definition.
10 //
11 //---
12 // $Id$
13 
14 #include "ossimS3StreamBuffer.h"
15 #include "ossimAwsStreamFactory.h"
16 #include "S3HeaderCache.h"
17 
20 #include <ossim/base/ossimUrl.h>
21 #include <ossim/base/ossimTrace.h>
22 #include <ossim/base/ossimTimer.h>
23 
24 #include <aws/s3/S3Client.h>
25 #include <aws/s3/model/PutObjectRequest.h>
26 #include <aws/s3/model/GetObjectRequest.h>
27 #include <aws/s3/model/GetObjectResult.h>
28 #include <aws/s3/model/HeadObjectRequest.h>
29 #include <aws/core/Aws.h>
30 #include <aws/core/client/ClientConfiguration.h>
31 #include <aws/core/http/HttpRequest.h>
32 #include <aws/core/utils/memory/stl/AWSStringStream.h>
33 
34 #include <cstdio> /* for EOF */
35 #include <cstring> /* for memcpy */
36 #include <ios>
37 #include <iostream>
38 #include <streambuf>
39 #include <sstream>
40 #include <ctime>
41 #include <vector>
42 
43 //static const char* KEY = "test-file.txt";
44 //static const char* BUCKET = "ossimlabs";
45 
46 using namespace Aws::S3;
47 using namespace Aws::S3::Model;
48 static ossimTrace traceDebug("ossimS3StreamBuffer:debug");
49 
51  :
52  m_bucket(""),
53  m_key(""),
54  m_buffer(blockSize),
55  m_bufferActualDataSize(0),
56  m_currentBlockPosition(-1),
57  m_bufferPtr(0),
58  m_fileSize(0),
59  m_opened(false)
60  //m_mode(0)
61 {
62  Aws::Client::ClientConfiguration config;
63 
64  // Look for AWS S3 regionn override:
65  std::string region = ossimPreferences::instance()->
66  preferencesKWL().findKey(std::string("ossim.plugins.aws.s3.region"));
67  if ( region.size() )
68  {
69  config.region = region.c_str();
70  }
72  // m_client = new Aws::S3::S3Client( config );
73 
74 // std::cout << "CONSTRUCTED!!!!!" << std::endl;
75 // setp(0);
77 }
78 
80 {
81 }
82 
84 {
85  ossim_int64 blockNumber = -1;
86 
87  if(byteOffset < (ossim_int64)m_fileSize)
88  {
89  if(m_buffer.size()>0)
90  {
91  blockNumber = byteOffset/m_buffer.size();
92  }
93  }
94 
95  return blockNumber;
96 }
97 
99 {
100  ossim_int64 blockOffset = -1;
101 
102  if(m_buffer.size()>0)
103  {
104  blockOffset = byteOffset%m_buffer.size();
105  }
106 
107  return blockOffset;
108 }
109 
111  ossim_int64& startRange,
112  ossim_int64& endRange)const
113 {
114  bool result = false;
115 
116  if(blockIndex >= 0)
117  {
118  startRange = blockIndex*m_buffer.size();
119  endRange = startRange + m_buffer.size()-1;
120 
121  result = true;
122  }
123 
124  return result;
125 }
126 
128 {
129  if(traceDebug())
130  {
132  << "ossim::S3StreamBuffer::loadBlock DEBUG: entered with absolute position: " << absolutePosition << "\n";
133 
134  }
135  bool result = false;
136  m_bufferPtr = 0;
137  GetObjectRequest getObjectRequest;
138  std::stringstream stringStream;
139  ossim_int64 startRange, endRange;
140  ossim_int64 blockIndex = getBlockIndex(absolutePosition);
141  if((absolutePosition < 0) || (absolutePosition > (ossim_int64)m_fileSize)) return false;
142  //std::cout << "CURRENT BYTE LOCATION = " << absoluteLocation << std::endl;
143  if(getBlockRangeInBytes(blockIndex, startRange, endRange))
144  {
145  stringStream << "bytes=" << startRange << "-" << endRange;
146  getObjectRequest.WithBucket(m_bucket.c_str())
147  .WithKey(m_key.c_str()).WithRange(stringStream.str().c_str());
148  auto getObjectOutcome = m_client->GetObject(getObjectRequest);
149 
150  if(getObjectOutcome.IsSuccess())
151  {
152 // std::cout << "GOOD CALL!!!!!!!!!!!!\n";
153  Aws::IOStream& bodyStream = getObjectOutcome.GetResult().GetBody();
154  ossim_int64 bufSize = getObjectOutcome.GetResult().GetContentLength();
155 // std::cout << "SIZE OF RESULT ======== " << bufSize << std::endl;
156  m_bufferActualDataSize = bufSize;
157  bodyStream.read(&m_buffer.front(), bufSize);
158  m_bufferPtr = &m_buffer.front();
159 
160  ossim_int64 delta = absolutePosition-startRange;
161  setg(m_bufferPtr, m_bufferPtr + delta, m_bufferPtr+m_bufferActualDataSize);
162  m_blockInfo.setBytes(startRange,startRange+delta,startRange+m_bufferActualDataSize);
163  // std::cout << "LOADING BLOCK: " << m_blockInfo.getStartByte() << ", " << m_blockInfo.getCurrentByte() << ", " << m_blockInfo.getEndByte() << "\n";
164  m_currentBlockPosition = startRange;
165  result = true;
166  //std::cout << "Successfully retrieved object from s3 with value: " << std::endl;
167  //std::cout << getObjectOutcome.GetResult().GetBody().rdbuf() << std::endl << std::endl;;
168  }
169  else
170  {
171  m_bufferActualDataSize = 0;
172  }
173  }
174  if(traceDebug())
175  {
177  << "ossim::S3StreamBuffer::loadBlock DEBUG: leaving with absolutePosition " << absolutePosition << "\n";
178 
179  }
180 
181  return result;
182 }
183 
184 ossim::S3StreamBuffer* ossim::S3StreamBuffer::open (const char* connectionString,
185  const ossimKeywordlist& options,
186  std::ios_base::openmode m)
187 {
188  std::string temp(connectionString);
189  return open(temp, options, m);
190 }
191 
192 ossim::S3StreamBuffer* ossim::S3StreamBuffer::open (const std::string& connectionString,
193  const ossimKeywordlist& /* options */,
194  std::ios_base::openmode /* mode */)
195 {
196  if(traceDebug())
197  {
199  << "ossim::S3StreamBuffer::open DEBUG: entered..... with connection " << connectionString << std::endl;
200  }
201  // bool result = false;
202  ossimUrl url(connectionString);
203  clearAll();
204  // m_mode = mode;
206 
207  // AWS server is case insensitive:
208  if( (url.getProtocol() == "s3") || (url.getProtocol() == "S3") )
209  {
210  ossim_int64 filesize;
211  m_bucket = url.getIp().c_str();
212  m_key = url.getPath().c_str();
213  if(ossim::S3HeaderCache::instance()->getCachedFilesize(connectionString, filesize))
214  {
215  m_fileSize = filesize;
216  if(m_fileSize >= 0)
217  {
218  m_opened = true;
219  m_currentBlockPosition = 0;
220  }
221  }
222  else
223  {
224  if(!m_bucket.empty() && !m_key.empty())
225  {
226  HeadObjectRequest headObjectRequest;
227  headObjectRequest.WithBucket(m_bucket.c_str())
228  .WithKey(m_key.c_str());
229  auto headObject = m_client->HeadObject(headObjectRequest);
230  if(headObject.IsSuccess())
231  {
232  m_fileSize = headObject.GetResult().GetContentLength();
233  m_opened = true;
234  m_currentBlockPosition = 0;
235  ossim::S3HeaderCache::Node_t nodePtr = std::make_shared<ossim::S3HeaderCacheNode>(m_fileSize);
236  ossim::S3HeaderCache::instance()->addHeader(connectionString, nodePtr);
237  }
238  else
239  {
240  m_opened = false;
241  m_fileSize = -1;
242  m_currentBlockPosition = 0;
243  ossim::S3HeaderCache::Node_t nodePtr = std::make_shared<ossim::S3HeaderCacheNode>(m_fileSize);
244  ossim::S3HeaderCache::instance()->addHeader(connectionString, nodePtr);
245  }
246  }
247  }
248  }
250 
251  if(traceDebug())
252  {
253  ossim_float64 delta = ossimTimer::instance()->delta_s(startTimer, endTimer);
254 
256  << "ossim::S3StreamBuffer::open DEBUG: Took " << delta << " seconds to open" << std::endl;
258  << "ossim::S3StreamBuffer::open DEBUG: leaving....." << std::endl;
259 
260  }
261  if(m_opened) return this;
262 
263  return 0;
264 }
265 
266 
267 
269 {
270  m_bucket = "";
271  m_key = "";
272  m_fileSize = 0;
273  m_opened = false;
274  m_currentBlockPosition = 0;
275  m_blockInfo.setBytes(0,0,0);
276 }
277 
278 
280 {
281  if(!is_open())
282  {
283  return EOF;
284  }
285  else if( !m_blockInfo.withinWindow() )
286  {
287  ossim_int64 absolutePosition = getAbsoluteByteOffset();
288  if(absolutePosition < 0)
289  {
290  return EOF;
291  }
292 
293  if(!loadBlock(absolutePosition))
294  {
295  return EOF;
296  }
297  }
298 
299  // std::cout << "GPTR CHARACTER ========== "
300  // << (int)static_cast<ossim_uint8>(*gptr()) << std::endl;
301 
302  //---
303  // Double cast to get non-negative values so as to not send an inadvertent
304  // EOF(-1) to caller.
305  //---
306  return (int)static_cast<ossim_uint8>(*gptr());
307 }
308 
309 ossim::S3StreamBuffer::pos_type ossim::S3StreamBuffer::seekoff(off_type offset,
310  std::ios_base::seekdir dir,
311  std::ios_base::openmode mode)
312 {
313  // std::cout <<"ossim::S3StreamBuffer::seekoff type size === " << sizeof(off_type) << std::endl;
314  // std::cout <<"ossim::S3StreamBuffer::seekoff offset ====== " << offset << std::endl;
315  // std::cout << "ossim::S3StreamBuffer::seekoff\n";
316  pos_type result = pos_type(off_type(-1));
317  // bool withinBlock = true;
318  if((mode & std::ios_base::in)&&
319  (mode & std::ios_base::out))
320  {
321  return result;
322  }
323  switch(dir)
324  {
325  case std::ios_base::beg:
326  {
327  ossim_int64 absolutePosition = getAbsoluteByteOffset();
328  // really would like to figure out a better way but for now
329  // we have to have one valid block read in to properly adjust
330  // gptr() with gbump
331  //
332  // if(!gptr())
333  // {
334  // if(!loadBlock(offset))
335  // {
336  // return result;
337  // }
338  // }
339  if((offset <= (ossim_int64)m_fileSize)&&
340  (offset >=0))
341  {
342  result = pos_type(offset);
343  }
344  // if(!gptr())
345  // {
346  // if(!loadBlock(result))
347  // {
348  // return EOF;
349  // }
350  // }
351  //else
352  if(mode & std::ios_base::in)
353  {
354  absolutePosition = getAbsoluteByteOffset();
355  ossim_int64 delta = offset - absolutePosition;
356  setg(eback(), gptr()+delta, egptr());
357  m_blockInfo.setCurrentByte(offset);
358  }
359  // std::cout << "ossim::S3StreamBuffer::seekoff beg RESULT??????????????????? " << result << std::endl;
360  break;
361  }
362  case std::ios_base::cur:
363  {
364  // if(!gptr())
365  // {
366  // // std::cout << "LOADING BLOCK!!!!!!!!!!!!!!\n" << std::endl;
367  // if(!loadBlock(0))
368  // {
369  // return result;
370  // }
371  // }
372  // result = getAbsoluteByteOffset();
373  // std::cout << "INITIAL ABSOLUTE BYTE OFFSET ==== " << result << "\n";
374  if(!offset)
375  {
376  result = getAbsoluteByteOffset();
377  }
378  else
379  {
380  result += offset;
381  setg(eback(), gptr()+offset, egptr());
382  m_blockInfo.setCurrentByte(m_blockInfo.getCurrentByte()+offset);
383  }
384 
385  break;
386  }
387  case std::ios_base::end:
388  {
389  ossim_int64 absolutePosition = m_fileSize + offset;
390  // if(!gptr())
391  // {
392  // if(!loadBlock(absolutePosition))
393  // {
394  // return result;
395  // }
396  // }
397  ossim_int64 currentAbsolutePosition = getAbsoluteByteOffset();
398  ossim_int64 delta = absolutePosition-currentAbsolutePosition;
399 
400 // std::cout << "CURRENT ABSOLUTE POSITION === " << currentAbsolutePosition << std::endl;
401 // std::cout << "CURRENT ABSOLUTE delta POSITION === " << delta << std::endl;
402  if(mode & std::ios_base::in )
403  {
404  setg(eback(), gptr()+delta, egptr());
405  m_blockInfo.setCurrentByte(absolutePosition);
406  result = absolutePosition;
407  }
408  // std::cout << "ossim::S3StreamBuffer::seekoff end RESULT??????????????????? " << result << std::endl;
409  break;
410  }
411  default:
412  {
413  break;
414  }
415  }
416 
417  return result;
418 }
419 
420 ossim::S3StreamBuffer::pos_type ossim::S3StreamBuffer::seekpos(pos_type pos, std::ios_base::openmode mode)
421 {
422  // std::cout << "ossim::S3StreamBuffer::seekpos: " << pos << std::endl;
423  pos_type result = pos_type(off_type(-1));
424  ossim_int64 tempPos = static_cast<ossim_int64>(pos);
425  // Currently we must initialize to a block
426  // if(!gptr())
427  // {
428  // if(!loadBlock(tempPos))
429  // {
430  // return result;
431  // }
432  // }
433  ossim_int64 absoluteLocation = getAbsoluteByteOffset();
434  if(mode & std::ios_base::in)
435  {
436  if((pos >= 0)&&(pos < (ossim_int64)m_fileSize))
437  {
438  m_blockInfo.setCurrentByte(tempPos);
439  ossim_int64 delta = tempPos-absoluteLocation;
440  if(delta)
441  {
442  setg(eback(), gptr()+delta, egptr());
443  }
444  result = pos;
445  }
446  }
447  return result;
448 }
449 
450 std::streamsize ossim::S3StreamBuffer::xsgetn(char_type* s, std::streamsize n)
451 {
452  // std::cout << "ossim::S3StreamBuffer::xsgetn" << std::endl;
453 
454  if(!is_open()) return EOF;
455  // unsigned long int bytesLeftToRead = egptr()-gptr();
456  // initialize if we need to to load the block at current position
457 // if((!withinWindow())&&is_open())
458  if((!m_blockInfo.withinWindow())&&is_open())
459  {
460  // if(!gptr())
461  // {
462  // if(!loadBlock(0))
463  // {
464  // return EOF;
465  // }
466  // }
467  // else if(!loadBlock(getAbsoluteByteOffset()))
468  // {
469  // return EOF;
470  // }
471  if(!loadBlock(m_blockInfo.getCurrentByte()))
472  {
473  return EOF;
474  }
475  }
476  ossim_int64 bytesNeedToRead = n;
477  // ossim_int64 bytesToRead = 0;
478  ossim_int64 bytesRead = 0;
479  ossim_int64 currentAbsolutePosition = m_blockInfo.getCurrentByte();//getAbsoluteByteOffset();
480  // ossim_int64 startOffset, endOffset;
481  if(currentAbsolutePosition >= (ossim_int64)m_fileSize)
482  {
483  return EOF;
484  }
485  else if((currentAbsolutePosition + bytesNeedToRead)>(ossim_int64)m_fileSize)
486  {
487  bytesNeedToRead = (m_fileSize - currentAbsolutePosition);
488  }
489 
490  while(bytesNeedToRead > 0)
491  {
492  currentAbsolutePosition = m_blockInfo.getCurrentByte();//getAbsoluteByteOffset();
493  //if(!withinWindow())
494  if(!m_blockInfo.withinWindow())
495  {
496  if(!loadBlock(m_blockInfo.getCurrentByte()))
497  {
498  return bytesRead;
499  }
500  currentAbsolutePosition = m_blockInfo.getCurrentByte();
501  }
502 
503  // get each bloc
504  if(currentAbsolutePosition>=0)
505  {
506  //getBlockRangeInBytes(getBlockIndex(m_currentBlockPosition), startOffset, endOffset);
507 
508  ossim_int64 delta = (m_blockInfo.getEndByte()-m_blockInfo.getCurrentByte());//(endOffset - m_currentBlockPosition)+1;
509 
510  if(delta <= bytesNeedToRead)
511  {
512 
513  std::memcpy(s+bytesRead, gptr(), delta);
514  //m_currentBlockPosition += delta;
515  //setg(eback(), egptr(), egptr());
516  bytesRead+=delta;
517  bytesNeedToRead-=delta;
518  //gbump(delta);
519  setg(eback(), gptr()+delta, egptr());
520  m_blockInfo.setCurrentByte(m_blockInfo.getCurrentByte()+delta);
521  }
522  else
523  {
524  std::memcpy(s+bytesRead, gptr(), bytesNeedToRead);
525  setg(eback(), gptr()+bytesNeedToRead, egptr());
526  m_blockInfo.setCurrentByte(m_blockInfo.getCurrentByte()+bytesNeedToRead);
527  //gbump(bytesNeedToRead);
528 // std::cout << "gbump 2 DELTA ========= " << bytesNeedToRead << std::endl;
529  bytesRead+=bytesNeedToRead;
530  bytesNeedToRead=0;
531  }
532  }
533  else
534  {
535  break;
536  }
537  }
538  return std::streamsize(bytesRead);
539 }
540 
542 {
543  ossim_int64 result = -1;
544 
545  if(m_currentBlockPosition >= 0)
546  {
547  result = m_blockInfo.getCurrentByte();
548  //result = m_currentBlockPosition;
549  // if(gptr()&&eback())
550  // {
551  // result += (gptr()-eback());
552  // }
553  }
554 
555  // std::cout << "RESULT getAbsoluteByteOffset======== " << result << "\n";
556  return result;
557 }
558 
560 {
561  if(!gptr()) return false;
562  return ((gptr()>=eback()) && (gptr()<egptr()));
563 }
564 
566 {
567  return static_cast<ossim_uint32>(m_fileSize);
568 }
569 
571 {
572  return m_buffer.size();
573 }
bool getBlockRangeInBytes(ossim_int64 blockIndex, ossim_int64 &startRange, ossim_int64 &endRange) const
ossim_int64 getBlockIndex(ossim_int64 byteOffset) const
virtual std::streamsize xsgetn(char_type *s, std::streamsize n)
std::basic_stringstream< char > stringstream
Class for char mixed input and output memory streams.
Definition: ossimIosFwd.h:38
std::shared_ptr< S3HeaderCacheNode > Node_t
Definition: S3HeaderCache.h:30
virtual pos_type seekpos(pos_type pos, std::ios_base::openmode mode=std::ios_base::in|std::ios_base::out)
unsigned long long Timer_t
Definition: ossimTimer.h:16
Represents serializable keyword/value map.
virtual pos_type seekoff(off_type offset, std::ios_base::seekdir dir, std::ios_base::openmode __mode=std::ios_base::in|std::ios_base::out)
ossim_uint64 getBlockSize() const
double ossim_float64
static ossimTimer * instance()
Definition: ossimTimer.cpp:19
os2<< "> n<< " > nendobj n
unsigned long long ossim_uint64
const ossimString & getProtocol() const
Definition: ossimUrl.h:17
unsigned int ossim_uint32
S3StreamBuffer * open(const char *connectionString, const ossimKeywordlist &options, std::ios_base::openmode mode)
const ossimString & getPath() const
Definition: ossimUrl.h:20
static ossimPreferences * instance()
std::shared_ptr< Aws::S3::S3Client > m_client
S3StreamBuffer(ossim_int64 blockSize=ossim::S3StreamDefaults::m_readBlocksize)
std::shared_ptr< Aws::S3::S3Client > getSharedS3Client() const
ossim_int64 getAbsoluteByteOffset() const
ossim_int64 getBlockOffset(ossim_int64 byteOffset) const
static AwsStreamFactory * instance()
static std::shared_ptr< S3HeaderCache > instance()
long long ossim_int64
const char * c_str() const
Returns a pointer to a null-terminated array of characters representing the string&#39;s contents...
Definition: ossimString.h:396
const ossimString & getIp() const
Definition: ossimUrl.h:18
bool loadBlock(ossim_int64 absolutePosition)
Timer_t tick() const
Get the timers tick value.
Definition: ossimTimer.cpp:95
double delta_s(Timer_t t1, Timer_t t2) const
Get the time in seconds between timer ticks t1 and t2.
Definition: ossimTimer.h:45
unsigned char ossim_uint8
OSSIMDLLEXPORT std::ostream & ossimNotify(ossimNotifyLevel level=ossimNotifyLevel_WARN)
ossim_uint64 getFileSize() const