GenomicDataStream
A scalable interface between data and analysis
Loading...
Searching...
No Matches
bgenstream.h
Go to the documentation of this file.
1/***********************************************************************
2 * @file bgenstream.h
3 * @author Gabriel Hoffman
4 * @email gabriel.hoffman@mssm.edu
5 * @brief reads a BGEN into matrix in chunks, storing variants in columns
6 * Copyright (C) 2024 Gabriel Hoffman
7 ***********************************************************************/
8
9#ifndef BGEN_STREAM_H_
10#define BGEN_STREAM_H_
11
12#ifndef DISABLE_EIGEN
13#include <Eigen/Sparse>
14#endif
15
16#include <string>
17
18#include "genfile/bgen/View.hpp"
19#include "genfile/bgen/IndexQuery.hpp"
20
21#include <boost/algorithm/string.hpp>
22
23#include "VariantInfo.h"
25#include "GenomicRanges.h"
26#include "VariantSet.h"
27#include "bgen_load.h"
28
29using namespace std;
30using namespace arma;
31using namespace genfile::bgen;
32
33namespace gds {
34
39static genfile::bgen::View::UniquePtr construct_view(
40 const string & filename) {
41
42 if( ! filesystem::exists( filename ) ){
43 throw runtime_error("File does not exist: " + filename);
44 }
45
46 View::UniquePtr view = genfile::bgen::View::create( filename ) ;
47
48 return view ;
49}
50
51static IndexQuery::UniquePtr construct_query(const string &index_filename){
52
53 if( ! filesystem::exists( index_filename ) ){
54 throw runtime_error("File does not exist: " + index_filename);
55 }
56
57 IndexQuery::UniquePtr query = IndexQuery::create( index_filename ) ;
58 return query;
59}
60
61
62
70static genfile::bgen::View::UniquePtr construct_view(
71 const string & filename,
72 const string & index_filename,
73 const GenomicRanges & gr) {
74 // const vector<string> & rsids = vector<string>()) {
75
76 // create view of BGEN file
77 View::UniquePtr view = construct_view( filename );
78
79 // process region queries
80 if( gr.size() > 0){
81 IndexQuery::UniquePtr query = construct_query(index_filename);
82 for( int i = 0; i < gr.size(); i++ ) {
83 query->include_range( IndexQuery::GenomicRange( gr.get_chrom(i) , gr.get_start(i), gr.get_end(i) ) ) ;
84 }
85 // query->include_rsids( rsids ) ;
86 query->initialise() ;
87 view->set_query( query ) ;
88 }
89
90 return view ;
91}
92
93static std::shared_ptr<VariantSet> getVariantSet(
94 const string & index_filename){
95
96 // Initialize SQLite database
97 SqliteIndexQuery query(index_filename);
98
99 std::vector<std::string> rsid, chrom;
100 std::vector<int> position;
101
102 // populate rsid, chrom, position for all variants
103 query.get_variant_info(&rsid, &chrom, &position);
104
105 // construct VariantSet
106 VariantSet vs(chrom, position, rsid);
107
108 // return shared pointer
109 return std::make_shared<VariantSet>(vs);
110}
111
112
113
118 public GenomicDataStream {
119 public:
120
122
124
128
129 // Initialize view from just bgen file
130 view = construct_view( param.file ) ;
131
132 filenameIdxGlobal = param.file + ".bgi";
133 // queryGlobal = construct_query( filenameIdxGlobal );
134
135 // Read VariantSet from SQLite .bgi file
136 vs = getVariantSet( filenameIdxGlobal );
137
138 // apply region filters
139 setRegions( param.regions );
140
141 // Filter samples
142 if( param.samples.compare("-") == 0 ){
143 get_all_samples( *view, &number_of_samples, &sampleNames, &requestedSamplesByIndexInDataIndex ) ;
144 }else{
145
146 // get subset of samples
147 vector<string> requestedSamples;
148
149 // boost::erase_all(param.samples, " ");
150 boost::split(requestedSamples, param.samples, boost::is_any_of("\t,\n"));
151
152 get_requested_samples( *view, requestedSamples, &number_of_samples, &sampleNames, &requestedSamplesByIndexInDataIndex ) ;
153 }
154
155 vInfo = new VariantInfo( sampleNames );
156
157 // store probabilities
158 probs.reserve( n_samples() * param.chunkSize * max_entries_per_sample);
159
160 // store dosage
161 matDosage.reserve( n_samples() * param.chunkSize );
162 }
163
167 if( vInfo != nullptr) delete vInfo;
168 }
169
172 void setRegions(const vector<string> &regions) override {
173
174 GenomicRanges gr( regions );
175
176 auto query = construct_query( filenameIdxGlobal );
177
178 if( gr.size() > 0){
179
180 // get indeces of variants overlapping these regions
181 varIdx = vs->getIndeces( gr );
182
183 // get IDs of these variants
184 vector<string> variantIDs = vs->getVariantIDs(varIdx);
185
186 // query these variants from the BGEN file
187 query->include_rsids( variantIDs );
188 query->initialise() ;
189 view->set_query( query ) ;
190 }
191
192 // number of variants after filtering
193 n_variants_total = view->number_of_variants() ;
194
195 // set current position of index
196 variant_idx_start = 0;
197 }
198
201 int n_samples() override {
202 return number_of_samples;
203 }
204
207 vector<string> getSampleNames() override {
208 return vInfo->sampleNames;
209 }
210
213 string getStreamType() override {
214 return toString( param.fileType);
215 }
216
218 return vs->getChromRanges();
219 }
220
221 bool getNextChunk( DataChunk<arma::mat> & chunk, const bool &useFilter = true) override {
222
223 // Update matDosage and vInfo for the chunk
224 bool ret = getNextChunk_helper();
225
226 if( useFilter ){
227 // modifies matDosage and vInfo directly
228 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
229 }
230
231 arma::mat M(matDosage.data(), number_of_samples, vInfo->size(), false, true);
232 chunk = DataChunk<arma::mat>( M, vInfo );
233
234 return ret;
235 }
236
237 bool getNextChunk( DataChunk<arma::sp_mat> & chunk, const bool &useFilter = true) override {
238
239 // Update matDosage and vInfo for the chunk
240 bool ret = getNextChunk_helper();
241
242 if( useFilter ){
243 // modifies matDosage and vInfo directly
244 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
245 }
246
247 arma::mat M(matDosage.data(), number_of_samples, vInfo->size(), false, true);
248
249 chunk = DataChunk<arma::sp_mat>( arma::sp_mat(M), vInfo );
250
251 return ret;
252 }
253
254 #ifndef DISABLE_EIGEN
255 bool getNextChunk( DataChunk<Eigen::MatrixXd> & chunk, const bool &useFilter = true) override {
256
257 // Update matDosage and vInfo for the chunk
258 bool ret = getNextChunk_helper();
259
260 if( useFilter ){
261 // modifies matDosage and vInfo directly
262 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
263 }
264
265 Eigen::MatrixXd M = Eigen::Map<Eigen::MatrixXd>(matDosage.data(), number_of_samples, vInfo->size());
266
267 chunk = DataChunk<Eigen::MatrixXd>( M, vInfo );
268
269 return ret;
270 }
271
272
273 bool getNextChunk( DataChunk<Eigen::SparseMatrix<double> > & chunk, const bool &useFilter = true) override {
274
275 // Update matDosage and vInfo for the chunk
276 bool ret = getNextChunk_helper();
277
278 if( useFilter ){
279 // modifies matDosage and vInfo directly
280 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
281 }
282
283 Eigen::MatrixXd M = Eigen::Map<Eigen::MatrixXd>(matDosage.data(), number_of_samples, vInfo->size());
284
285 chunk = DataChunk<Eigen::SparseMatrix<double>>( M.sparseView(), vInfo );
286
287 return ret;
288 }
289 #endif
290
291 #ifndef DISABLE_RCPP
292 bool getNextChunk( DataChunk<Rcpp::NumericMatrix> & chunk, const bool &useFilter = true) override {
293
294 // Update matDosage and vInfo for the chunk
295 bool ret = getNextChunk_helper();
296
297 if( useFilter ){
298 // modifies matDosage and vInfo directly
299 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
300 }
301
302 Rcpp::NumericMatrix M(number_of_samples, vInfo->size(), matDosage.data());
303 colnames(M) = Rcpp::wrap( vInfo->getFeatureNames() );
304 rownames(M) = Rcpp::wrap( vInfo->sampleNames );
305
306 chunk = DataChunk<Rcpp::NumericMatrix>( M, vInfo );
307
308 return ret;
309 }
310 #endif
311
312 bool getNextChunk( DataChunk<vector<double> > & chunk, const bool &useFilter = true) override {
313
314 // Update matDosage and vInfo for the chunk
315 bool ret = getNextChunk_helper();
316
317 if( useFilter ){
318 // modifies matDosage and vInfo directly
319 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
320 }
321
322 chunk = DataChunk<vector<double> >( matDosage, vInfo );
323
324 return ret;
325 }
326
327 private:
328 View::UniquePtr view = nullptr;
329 // IndexQuery::UniquePtr queryGlobal = nullptr;
330 std::shared_ptr<VariantSet> vs;
331 size_t number_of_samples = 0;
332 vector<string> sampleNames;
333 map<size_t, size_t> requestedSamplesByIndexInDataIndex;
334 VariantInfo *vInfo = nullptr;
335 vector<double> probs;
336 vector<double> matDosage;
337 size_t max_entries_per_sample = 4;
338 int n_variants_total;
339 int variant_idx_start;
340 vector<int> varIdx;
341
342 bool getNextChunk_helper(){
343
344 // clear data, but keep allocated capacity
345 matDosage.clear();
346 vInfo->clear();
347
348 // number of variants in this chunk
349 int chunkSize = min(param.chunkSize, n_variants_total - variant_idx_start);
350 chunkSize = max(chunkSize, 0);
351
352 // if no variants remain, return false
353 if( chunkSize == 0) return false;
354
355 vector<int> data_dimension;
356 data_dimension.push_back(chunkSize);
357 data_dimension.push_back(number_of_samples);
358 data_dimension.push_back(max_entries_per_sample);
359
360 vector<int> ploidy_dimension;
361 ploidy_dimension.push_back( chunkSize ) ;
362 ploidy_dimension.push_back( number_of_samples ) ;
363
364 vector<int> ploidy(ploidy_dimension[0]*ploidy_dimension[1], numeric_limits<int>::quiet_NaN());
365 vector<bool> phased( chunkSize, numeric_limits<bool>::quiet_NaN() ) ;
366
367 string SNPID, rsid, chromosome;
368 genfile::bgen::uint32_t position;
369 vector<string> alleles;
370
371 // Iterate through variants
372 size_t k = 0;
373 for( size_t j = variant_idx_start; j < variant_idx_start + chunkSize; j++ ) {
374
375 // read variant information
376 view->read_variant( &SNPID, &rsid, &chromosome, &position, &alleles ) ;
377
378 // store variant info
379 vInfo->addVariant(chromosome, position, rsid, alleles[0], alleles[1] );
380
381 // read genotype probabilities into DataSetter object
382 DataSetter setter(
383 &ploidy, ploidy_dimension,
384 &probs, data_dimension,
385 &phased,
386 k++,
387 requestedSamplesByIndexInDataIndex
388 );
389
390 view->read_genotype_data_block( setter ) ;
391 }
392 // increament starting position to beginning of next chunk
393 variant_idx_start += chunkSize;
394
395 // Convert to dosage values stored in vector<double>
396 //------------------
397
398 // use probs to create Cube
399 cube C(probs.data(), chunkSize, number_of_samples, max_entries_per_sample, true, true);
400
401 // weight alleles by dosage
402 // With max_entries_per_sample = 4, the unphased coding is
403 // AA/AB/BB/NULL so use weights 0/1/2/0 to conver to dosage
404 // since the last entry doesn't encode valid information
405 // When phased, the coding is [a1 a2] / [a1 a2]
406 // so use weights 0/1/0/1
407 // vec w_unph = {0,1,2,0};
408 // vec w_ph = {0,1,0,1};
409 // compute dosages with weights depend on phasing
410 // w = phased[j] ? w_ph : w_unph;
411 // dsg = C.row_as_mat(j).t() * w;
412 // BUT !!!
413 // in unphased data, they last entry can be NaN
414 // and NaN * 0 is still NaN
415 // so need to drop the last entry _manually_
416
417 vec v, dsg;
418 vec w_unph = {0,1,2};
419 vec w_ph = {0,1,0,1};
420 mat m;
421
422 // compute dosage from Cube
423 // copy results of each variant to vector<double>
424 for(int j=0; j<chunkSize; j++){
425 if( phased[j] ){
426 dsg = C.row_as_mat(j).t() * w_ph;
427 }else{
428 // extract columns 0,1,2
429 // skip 3rd
430 m = C.row_as_mat(j).t();
431 dsg = m.cols(0,2) * w_unph;
432 }
433
434 // replace missing with mean
435 if( param.missingToMean ) nanToMean( dsg );
436
437 // save vector in matDosage
438 memcpy(matDosage.data() + number_of_samples*j, dsg.memptr(), number_of_samples*sizeof(double));
439 }
440
441 return true;
442 }
443};
444
445}
446
447#endif
Definition GenomicDataStream_virtual.h:34
double getMinVariance()
Definition GenomicDataStream_virtual.h:200
double getMAF() const
Definition GenomicDataStream_virtual.h:206
Param param
Definition GenomicDataStream_virtual.h:254
GenomicDataStream()
Definition GenomicDataStream_virtual.h:168
Definition GenomicRanges.h:25
vector< size_t > get_end() const
Definition GenomicRanges.h:55
vector< string > get_chrom() const
Definition GenomicRanges.h:53
vector< size_t > get_start() const
Definition GenomicRanges.h:54
const int size() const
Definition GenomicRanges.h:61
Definition VariantInfo.h:24
void clear()
Definition VariantInfo.h:137
void addVariant(const string &chr, const int &pos, const string &id, const string &allele1, const string &allele2)
Definition VariantInfo.h:38
Definition VariantSet.h:45
bool getNextChunk(DataChunk< Rcpp::NumericMatrix > &chunk, const bool &useFilter=true) override
Definition bgenstream.h:292
bool getNextChunk(DataChunk< arma::mat > &chunk, const bool &useFilter=true) override
Definition bgenstream.h:221
int n_samples() override
Definition bgenstream.h:201
bool getNextChunk(DataChunk< vector< double > > &chunk, const bool &useFilter=true) override
Definition bgenstream.h:312
bgenstream()
Definition bgenstream.h:123
bool getNextChunk(DataChunk< Eigen::MatrixXd > &chunk, const bool &useFilter=true) override
Definition bgenstream.h:255
bool getNextChunk(DataChunk< Eigen::SparseMatrix< double > > &chunk, const bool &useFilter=true) override
Definition bgenstream.h:273
GenomicRanges getChromRanges() override
Definition bgenstream.h:217
string getStreamType() override
Definition bgenstream.h:213
~bgenstream()
Definition bgenstream.h:166
bool getNextChunk(DataChunk< arma::sp_mat > &chunk, const bool &useFilter=true) override
Definition bgenstream.h:237
void setRegions(const vector< string > &regions) override
Definition bgenstream.h:172
string filenameIdxGlobal
Definition bgenstream.h:121
bgenstream(const Param &param)
Definition bgenstream.h:127
vector< string > getSampleNames() override
Definition bgenstream.h:207
Definition bgenstream.h:33
Definition GenomicDataStream_virtual.h:65
int chunkSize
Definition GenomicDataStream_virtual.h:157