root/tags/release-1.0-beta/drivedist.c

Revision 39, 12.9 KB (checked in by anton, 3 years ago)

1.0.0b tag added

Line 
1/*
2 * Finding the Driving Distance (isochrone/isodist) for PostgreSQL
3 *
4 * Copyright (c) 2006 Mario H. Basa, Orkney, Inc.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19 *
20 */
21
22#include "postgres.h"
23#include "executor/spi.h"
24#include "funcapi.h"
25
26#include "drivedist.h"
27
28//---------------------------------------------------------------------------
29
30/*
31 * Define this to have profiling enabled
32 */
33//#define PROFILE
34
35#ifdef PROFILE
36#include <sys/time.h>
37
38struct timeval prof_dijkstra, prof_store, prof_extract, prof_total;
39long proftime[5];
40long profipts1, profipts2, profopts;
41#define profstart(x) do { gettimeofday(&x, NULL); } while (0);
42#define profstop(n, x) do { struct timeval _profstop;   \
43        long _proftime;                         \
44        gettimeofday(&_profstop, NULL);                         \
45        _proftime = ( _profstop.tv_sec*1000000+_profstop.tv_usec) -     \
46                ( x.tv_sec*1000000+x.tv_usec); \
47        elog(NOTICE, \
48                "PRF(%s) %lu (%f ms)", \
49                (n), \
50             _proftime, _proftime / 1000.0);    \
51        } while (0);
52
53#else
54
55#define profstart(x) do { } while (0);
56#define profstop(n, x) do { } while (0);
57
58#endif // PROFILE
59
60
61//----------------------------------------------------------------------------
62
63Datum driving_distance(PG_FUNCTION_ARGS);
64
65#undef DEBUG
66//#define DEBUG 1
67
68#ifdef DEBUG
69#define DBG(format, arg...)                     \
70    elog(NOTICE, format , ## arg)
71#else
72#define DBG(format, arg...) do { ; } while (0)
73#endif
74
75// The number of tuples to fetch from the SPI cursor at each iteration
76#define TUPLIMIT 1000
77
78static char *
79text2char(text *in)
80{
81  char *out = palloc(VARSIZE(in));
82
83  memcpy(out, VARDATA(in), VARSIZE(in) - VARHDRSZ);
84  out[VARSIZE(in) - VARHDRSZ] = '\0';
85  return out;
86}
87
88static int
89finish(int code, int ret)
90{
91  code = SPI_finish();
92  if (code  != SPI_OK_FINISH )
93  {
94    elog(ERROR,"couldn't disconnect from SPI");
95    return -1 ;
96  }
97                       
98  return ret;
99}
100                         
101
102typedef struct edge_columns
103{
104  int id;
105  int source;
106  int target;
107  int cost;
108  int reverse_cost;
109} edge_columns_t;
110
111static int
112fetch_edge_columns(SPITupleTable *tuptable, edge_columns_t *edge_columns,
113                   bool has_reverse_cost)
114{
115  edge_columns->id     = SPI_fnumber(SPI_tuptable->tupdesc, "id");
116  edge_columns->source = SPI_fnumber(SPI_tuptable->tupdesc, "source");
117  edge_columns->target = SPI_fnumber(SPI_tuptable->tupdesc, "target");
118  edge_columns->cost   = SPI_fnumber(SPI_tuptable->tupdesc, "cost");
119
120  if (edge_columns->id     == SPI_ERROR_NOATTRIBUTE ||
121      edge_columns->source == SPI_ERROR_NOATTRIBUTE ||
122      edge_columns->target == SPI_ERROR_NOATTRIBUTE ||
123      edge_columns->cost   == SPI_ERROR_NOATTRIBUTE)  {
124    elog(ERROR, "Error, query must return columns "
125         "'id', 'source', 'target' and 'cost'");
126    return -1;
127  }
128 
129  if (SPI_gettypeid(SPI_tuptable->tupdesc, edge_columns->source) != INT4OID ||
130      SPI_gettypeid(SPI_tuptable->tupdesc, edge_columns->target) != INT4OID ||
131      SPI_gettypeid(SPI_tuptable->tupdesc, edge_columns->cost) != FLOAT8OID) {
132    elog(ERROR, "Error, columns 'source', 'target' must be of type int4, 'cost' must be of type float8");
133    return -1;
134  }
135 
136  DBG("columns: id %i source %i target %i cost %i",
137      edge_columns->id, edge_columns->source,
138      edge_columns->target, edge_columns->cost);
139 
140  if (has_reverse_cost) {
141    edge_columns->reverse_cost = SPI_fnumber(SPI_tuptable->tupdesc,
142                                             "reverse_cost");
143   
144    if (edge_columns->reverse_cost == SPI_ERROR_NOATTRIBUTE)  {
145      elog(ERROR, "Error, reverse_cost is used, but query did't return "
146           "'reverse_cost' column");
147      return -1;
148    }
149     
150    if (SPI_gettypeid(SPI_tuptable->tupdesc,
151                      edge_columns->reverse_cost) != FLOAT8OID) {
152      elog(ERROR, "Error, columns 'reverse_cost' must be of type float8");
153      return -1;
154    }
155     
156    DBG("columns: reverse_cost cost %i", edge_columns->reverse_cost);
157  }
158 
159  return 0;
160}
161
162static void
163fetch_edge(HeapTuple *tuple, TupleDesc *tupdesc, edge_columns_t *edge_columns,
164           edge_t *target_edge)
165{
166  Datum binval;
167  bool isnull;
168 
169  binval = SPI_getbinval(*tuple, *tupdesc, edge_columns->id, &isnull);
170 
171  if (isnull)
172    elog(ERROR, "id contains a null value");
173  target_edge->id = DatumGetInt32(binval);
174 
175  binval = SPI_getbinval(*tuple, *tupdesc, edge_columns->source, &isnull);
176 
177  if (isnull)
178    elog(ERROR, "source contains a null value");
179
180  target_edge->source = DatumGetInt32(binval);
181 
182  binval = SPI_getbinval(*tuple, *tupdesc, edge_columns->target, &isnull);
183
184  if (isnull)
185    elog(ERROR, "target contains a null value");
186 
187  target_edge->target = DatumGetInt32(binval);
188 
189  binval = SPI_getbinval(*tuple, *tupdesc, edge_columns->cost, &isnull);
190 
191  if (isnull)
192    elog(ERROR, "cost contains a null value");
193 
194  target_edge->cost = DatumGetFloat8(binval);
195 
196  if (edge_columns->reverse_cost != -1) {
197    binval = SPI_getbinval(*tuple, *tupdesc, edge_columns->reverse_cost,
198                           &isnull);
199   
200    if (isnull)
201      elog(ERROR, "reverse_cost contains a null value");
202    target_edge->reverse_cost =  DatumGetFloat8(binval);
203  }
204}
205
206
207static int compute_driving_distance(char* sql, int source_vertex_id,
208                                    float8 distance, bool directed,
209                                    bool has_reverse_cost,
210                                    path_element_t **path, int *path_count)
211{
212  int SPIcode;
213  void *SPIplan;
214  Portal SPIportal;
215  bool moredata = TRUE;
216  int ntuples;
217  edge_t *edges = NULL;
218  int total_tuples = 0;
219  edge_columns_t edge_columns = {id: -1, source: -1, target: -1,
220                                 cost: -1, reverse_cost: -1};
221
222  int v_max_id=0;
223  int v_min_id=INT_MAX;
224
225  char *err_msg;
226  int ret = -1;
227 
228  int s_count = 0;
229 
230  register int z;
231 
232  DBG("start driving_distance\n");
233 
234  SPIcode = SPI_connect();
235  if (SPIcode  != SPI_OK_CONNECT) {
236    elog(ERROR, "driving_distance: couldn't open a connection to SPI");
237    return -1;
238  }
239 
240  SPIplan = SPI_prepare(sql, 0, NULL);
241
242  if (SPIplan  == NULL) {
243    elog(ERROR, "driving_distance: couldn't create query plan via SPI");
244    return -1;
245  }
246
247  if ((SPIportal = SPI_cursor_open(NULL, SPIplan, NULL,
248                                   NULL, true)) == NULL) { 
249    elog(ERROR, "driving_distance: SPI_cursor_open('%s') returns NULL", sql);
250    return -1;
251  }
252
253  while (moredata == TRUE) {
254    SPI_cursor_fetch(SPIportal, TRUE, TUPLIMIT);
255
256
257    if (edge_columns.id == -1)  {
258      if (fetch_edge_columns(SPI_tuptable, &edge_columns,
259                             has_reverse_cost) == -1)
260        return finish(SPIcode, ret);
261    }
262
263    ntuples = SPI_processed;
264    total_tuples += ntuples;
265    if (!edges)
266      edges = palloc(total_tuples * sizeof(edge_t));
267    else
268      edges = repalloc(edges, total_tuples * sizeof(edge_t));
269
270    if (edges == NULL) {
271      elog(ERROR, "Out of memory");
272      return finish(SPIcode, ret);
273    }
274
275    if (ntuples > 0) {
276      int t;
277      SPITupleTable *tuptable = SPI_tuptable;
278      TupleDesc tupdesc = SPI_tuptable->tupdesc;
279     
280      for (t = 0; t < ntuples; t++) {
281        HeapTuple tuple = tuptable->vals[t];
282        fetch_edge(&tuple, &tupdesc, &edge_columns,
283                   &edges[total_tuples - ntuples + t]);
284      }
285      SPI_freetuptable(tuptable);
286    }
287    else {
288      moredata = FALSE;
289    }
290  }
291
292
293  //defining min and max vertex id
294     
295  DBG("Total %i tuples", total_tuples);
296   
297  for(z=0; z<total_tuples; z++)
298  {
299    if(edges[z].source<v_min_id)
300      v_min_id=edges[z].source;
301
302    if(edges[z].source>v_max_id)
303      v_max_id=edges[z].source;
304                                           
305    if(edges[z].target<v_min_id)
306      v_min_id=edges[z].target;
307
308    if(edges[z].target>v_max_id)
309      v_max_id=edges[z].target;     
310                                                                       
311    DBG("%i <-> %i", v_min_id, v_max_id);
312                               
313  }
314
315  //:::::::::::::::::::::::::::::::::::: 
316  //:: reducing vertex id (renumbering)
317  //::::::::::::::::::::::::::::::::::::
318  for(z=0; z<total_tuples; z++)
319  {
320    //check if edges[] contains source
321    if(edges[z].source == source_vertex_id ||
322       edges[z].target == source_vertex_id)
323      ++s_count;
324
325    edges[z].source-=v_min_id;
326    edges[z].target-=v_min_id;
327    DBG("%i - %i", edges[z].source, edges[z].target);     
328  }
329
330  if(s_count == 0)
331  {
332    elog(ERROR, "Start vertex was not found.");
333    return -1;
334  }
335                         
336  source_vertex_id -= v_min_id;
337
338  DBG("Calling boost_dijkstra\n");
339       
340  profstop("extract", prof_extract);
341  profstart(prof_dijkstra);
342 
343  ret = boost_dijkstra_dist(edges, total_tuples, source_vertex_id,
344                            distance, directed, has_reverse_cost,
345                            path, path_count, &err_msg);
346   
347  profstop("dijkstra", prof_dijkstra);
348  profstart(prof_store);
349   
350  //::::::::::::::::::::::::::::::::
351  //:: restoring original vertex id
352  //::::::::::::::::::::::::::::::::
353  for(z=0;z<*path_count;z++)
354  {
355    //DBG("vetex %i\n",(*path)[z].vertex_id);
356    (*path)[z].vertex_id+=v_min_id;
357  }
358
359  if (ret < 0) {
360    //elog(ERROR, "Error computing path: %s", err_msg);
361    ereport(ERROR, (errcode(ERRCODE_E_R_E_CONTAINING_SQL_NOT_PERMITTED),
362                    errmsg("Error computing path: %s", err_msg)));
363  }
364   
365  return finish(SPIcode, ret);
366}
367
368
369PG_FUNCTION_INFO_V1(driving_distance);
370Datum
371driving_distance(PG_FUNCTION_ARGS)
372{
373  FuncCallContext     *funcctx;
374  int                  call_cntr;
375  int                  max_calls;
376  TupleDesc            tuple_desc;
377  path_element_t      *path;
378
379  /* stuff done only on the first call of the function */
380  if (SRF_IS_FIRSTCALL()) {
381    MemoryContext   oldcontext;
382    int path_count = 0;
383    int ret;
384   
385    // XXX profiling messages are not thread safe
386    profstart(prof_total);
387    profstart(prof_extract);
388   
389    /* create a function context for cross-call persistence */
390    funcctx = SRF_FIRSTCALL_INIT();
391   
392    /* switch to memory context appropriate for multiple function calls */
393    oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
394   
395    ret = compute_driving_distance(text2char(PG_GETARG_TEXT_P(0)), // sql
396                                PG_GETARG_INT32(1),   // source vertex
397                                PG_GETARG_FLOAT8(2),  // distance or time
398                                PG_GETARG_BOOL(3),
399                                PG_GETARG_BOOL(4), &path, &path_count);
400
401#ifdef DEBUG
402    DBG("Ret is %i", ret);
403    if (ret >= 0) {
404      int i;
405      for (i = 0; i < path_count; i++) {
406        DBG("Step %i vertex_id  %i ", i, path[i].vertex_id);
407        DBG("        edge_id    %i ", path[i].edge_id);
408        DBG("        cost       %f ", path[i].cost);
409      }
410    }
411#endif
412
413    /* total number of tuples to be returned */
414    funcctx->max_calls = path_count;
415    funcctx->user_fctx = path;
416
417    funcctx->tuple_desc = BlessTupleDesc(
418                             RelationNameGetTupleDesc("path_result"));
419   
420    MemoryContextSwitchTo(oldcontext);
421  }
422 
423  /* stuff done on every call of the function */
424  funcctx = SRF_PERCALL_SETUP();
425
426  call_cntr = funcctx->call_cntr;
427  max_calls = funcctx->max_calls;
428  tuple_desc = funcctx->tuple_desc;
429  path = (path_element_t*) funcctx->user_fctx;
430 
431  if (call_cntr < max_calls) {   /* do when there is more left to send */
432    HeapTuple    tuple;
433    Datum        result;
434    Datum *values;
435    char* nulls;
436   
437
438    values = palloc(3 * sizeof(Datum));
439    nulls = palloc(3 * sizeof(char));
440   
441//  values[0] = Int32GetDatum(call_cntr);
442//  nulls[0] = ' ';
443    values[0] = Int32GetDatum(path[call_cntr].vertex_id);
444    nulls[0]  = ' ';
445    values[1] = Int32GetDatum(path[call_cntr].edge_id);
446    nulls[1]  = ' ';
447    values[2] = Float8GetDatum(path[call_cntr].cost);
448    nulls[2]  = ' ';
449
450    tuple = heap_formtuple(tuple_desc, values, nulls);
451   
452
453    /* make the tuple into a datum */
454    result = HeapTupleGetDatum(tuple);
455   
456    /* clean up (this is not really necessary) */
457    pfree(values);
458    pfree(nulls);
459
460    SRF_RETURN_NEXT(funcctx, result);
461  }
462  else {    /* do when there is no more left */
463    profstop("store", prof_store);
464    profstop("total", prof_total);
465#ifdef PROFILE
466    elog(NOTICE, "_________");
467#endif
468    DBG("Returning value");
469
470    SRF_RETURN_DONE(funcctx);
471  }
472}
Note: See TracBrowser for help on using the browser.