GenomicDataStream
A scalable interface between data and analysis
Loading...
Searching...
No Matches
vcfstream.h
Go to the documentation of this file.
1/***********************************************************************
2 * @file vcfstream.h
3 * @author Gabriel Hoffman
4 * @email gabriel.hoffman@mssm.edu
5 * @brief vcfstream reads a VCF/VCFGZ/BCF into a matrix in chunks, storing variants in columns
6 * Copyright (C) 2024 Gabriel Hoffman
7 ***********************************************************************/
8
9
10#ifndef VCF_STREAM_H_
11#define VCF_STREAM_H_
12
13#ifndef DISABLE_EIGEN
14#include <Eigen/Sparse>
15#endif
16
17#include <filesystem>
18#include <string>
19
20#include <vcfpp.h>
21
22#include "VariantInfo.h"
24#include "utils.h"
25#include "GenomicRanges.h"
26
27using namespace std;
28using namespace vcfpp;
29
30namespace gds {
31
35class vcfstream :
36 public GenomicDataStream {
37 public:
38
40
44
45 // check that file exists
46 if( ! filesystem::exists( param.file ) ){
47 throw runtime_error("File does not exist: " + param.file);
48 }
49
50 // check that field was specified
51 if( param.field.compare("") == 0 ){
52 throw runtime_error("Field for VCF/BCF not specified");
53 }
54
55 // initialize
56 reader = new BcfReader( param.file );
57
58 // Set genomic regions regions
59 setRegions( param.regions );
60
61 reader->setSamples( param.samples );
62
63 // Initialize record with info in header
64 record = new BcfRecord( reader->header );
65
66 // 1: int; 2: float; 3: string; 0: error;
67 fieldType = reader->header.getFormatType( param.field );
68
69 if( fieldType == 3 && param.field.compare("GT"))
70 throw std::runtime_error("field GT is the only supported string type");
71 if( fieldType == 0)
72 throw std::runtime_error("field not found: " + param.field);
73 // initialize varInfo with sample names
74 vInfo = new VariantInfo( reader->SamplesName );
75
76 // Initialize vector with capacity to store variants
77 // Note, this allocates memory but does not change .size()
78 // After j variants have been inserted, only entries up to j*nsamples are populated
79 // the rest of the vector is allocated doesn't have valid data
80 matDosage.reserve( n_samples() * param.chunkSize );
81 }
82
85 ~vcfstream() override {
86 if( reader != nullptr) delete reader;
87 if( record != nullptr) delete record;
88 if( vInfo != nullptr) delete vInfo;
89 }
90
93 void setRegions(const vector<string> &regions) override {
94
95 validRegions.reserve(regions.size());
96 validRegions.clear();
97 copy(regions.begin(), regions.end(), back_inserter(validRegions));
98
99 // for(auto &region: regions){
100 // checkStatus( region );
101 // }
102
103 // initialize to false
104 continueIterating = false;
105
106 // if valid set is not empty
107 if( validRegions.size() > 0 ){
108 // initialize iterator
109 itReg = validRegions.begin();
110
111 reader->setRegion( *itReg );
112
113 continueIterating = true;
114 }
115 }
116
119 int n_samples() override {
120 return reader->nsamples;
121 }
122
125 vector<string> getSampleNames() override {
126 return reader->header.getSamples();
127 }
128
131 string getStreamType() override {
132 return toString( param.fileType);
133 }
134
136 return GenomicRanges();
137 }
138
139 bool getNextChunk( DataChunk<arma::mat> & chunk, const bool &useFilter = true) override {
140
141 // Update matDosage and vInfo for the chunk
142 bool ret = getNextChunk_helper();
143
144 if( useFilter ){
145 // modifies matDosage and vInfo directly
146 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
147 }
148
149 // mat(ptr_aux_mem, n_rows, n_cols, copy_aux_mem = true, strict = false)
150 bool copy_aux_mem = false; // create read-only matrix without re-allocating memory
151
152 arma::mat M(matDosage.data(), reader->nsamples, vInfo->size(), copy_aux_mem, true);
153
154 chunk = DataChunk<arma::mat>( M, vInfo );
155
156 return ret;
157 }
158
159 bool getNextChunk( DataChunk<arma::sp_mat> & chunk, const bool &useFilter = true) override {
160
161 // Update matDosage and vInfo for the chunk
162 bool ret = getNextChunk_helper();
163
164 if( useFilter ){
165 // modifies matDosage and vInfo directly
166 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
167 }
168
169 // otherwise, set chunk and return ret
170 arma::mat M(matDosage.data(), reader->nsamples, vInfo->size(), false, true);
171
172 // create sparse matrix from dense matrix
173 chunk = DataChunk<arma::sp_mat>( arma::sp_mat(M), vInfo );
174
175 return ret;
176 }
177
178 #ifndef DISABLE_EIGEN
179 bool getNextChunk( DataChunk<Eigen::MatrixXd> & chunk, const bool &useFilter = true) override {
180
181 // Update matDosage and vInfo for the chunk
182 bool ret = getNextChunk_helper();
183
184 if( useFilter ){
185 // modifies matDosage and vInfo directly
186 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
187 }
188
189 Eigen::MatrixXd M = Eigen::Map<Eigen::MatrixXd>(matDosage.data(), reader->nsamples, vInfo->size());
190
191 chunk = DataChunk<Eigen::MatrixXd>( M, vInfo );
192
193 return ret;
194 }
195
196 bool getNextChunk( DataChunk<Eigen::SparseMatrix<double> > & chunk, const bool &useFilter = true) override {
197
198 // Update matDosage and vInfo for the chunk
199 bool ret = getNextChunk_helper();
200
201 if( useFilter ){
202 // modifies matDosage and vInfo directly
203 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
204 }
205
206 Eigen::MatrixXd M = Eigen::Map<Eigen::MatrixXd>(matDosage.data(), reader->nsamples, vInfo->size());
207
208 chunk = DataChunk<Eigen::SparseMatrix<double> >( M.sparseView(), vInfo );
209
210 return ret;
211 }
212 #endif
213
214 #ifndef DISABLE_RCPP
215 bool getNextChunk( DataChunk<Rcpp::NumericMatrix> & chunk, const bool &useFilter = true) override {
216
217 // Update matDosage and vInfo for the chunk
218 bool ret = getNextChunk_helper();
219
220 if( useFilter ){
221 // modifies matDosage and vInfo directly
222 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
223 }
224
225 Rcpp::NumericMatrix M(reader->nsamples, vInfo->size(), matDosage.data());
226 colnames(M) = Rcpp::wrap( vInfo->getFeatureNames() );
227 rownames(M) = Rcpp::wrap( vInfo->sampleNames );
228
229 chunk = DataChunk<Rcpp::NumericMatrix>( M, vInfo );
230
231 return ret;
232 }
233 #endif
234
235 bool getNextChunk( DataChunk<vector<double>> & chunk, const bool &useFilter = true) override {
236
237 // Update matDosage and vInfo for the chunk
238 bool ret = getNextChunk_helper();
239
240 if( useFilter ){
241 // modifies matDosage and vInfo directly
242 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
243 }
244
245 chunk = DataChunk<vector<double>>( matDosage, vInfo );
246
247 return ret;
248 }
249
254 static string variantToString( const BcfRecord &record ) {
255 string s = record.CHROM() + ":" + to_string(record.POS()) + " " + record.ID() + " " + record.REF() + " " + record.ALT();
256 return s;
257 }
258
259 private:
260 BcfReader *reader = nullptr;
261 BcfRecord *record = nullptr;
262 VariantInfo *vInfo = nullptr;
263 vector<string>::iterator itReg;
264 vector<string> validRegions;
265
266 bool continueIterating;
267 int fieldType;
268
269 // store genotype values
270 // type used based on fieldType
271 // 1: int;
272 // 2: float;
273 // 3: string;
274 // 0: error;
275 vector<int> values_int;
276 vector<float> values_fl;
277
278 vector<double> tmp;
279
280 // stores genotype dosage as doubles, with the next marker inserted at the end
281 // NOTE that when current size is exceeded, .insert() reallocates memory
282 // this can be slow
283 // set using reserve() to set initial capacity so avoid re-alloc
284 vector<double> matDosage;
285
286 bool getNextChunk_helper(){
287
288 // if no valid regions
289 if( validRegions.size() == 0) return false;
290
291 // if end of file reached, return false
292 if( ! continueIterating ) return continueIterating;
293
294 // clear data, but keep allocated capacity
295 matDosage.clear();
296 vInfo->clear();
297
298 // loop thru variant, updating the count each time
299 unsigned int j;
300 for(j=0; j < param.chunkSize; j++){
301
302 // get next variant
303 // if false, reached end of region
304 if( ! reader->getNextVariant( *record ) ){
305
306 // else go to next region
307 itReg++;
308
309 // if this was the last region
310 // set continueIterating so false is retured at next call to
311 // getNextChunk_helper()
312 // then break since no data left
313 if( itReg == validRegions.end()){
314 continueIterating = false;
315 break;
316 }
317
318 // else
319 // initialize the record for this region
320 reader->setRegion( *itReg );
321 reader->getNextVariant( *record );
322 }
323
324 // populate genotype with the values of the current variant
325 // If string, convert to dosage
326 // use values vector based on fieldType
327 switch(fieldType){
328 case 1: // int
329 record->getFORMAT( param.field, values_int);
330 matDosage.insert(matDosage.end(), values_int.begin(), values_int.end());
331 break;
332 case 2: // float
333 record->getFORMAT( param.field, values_fl);
334
335 if( param.field.compare("DS") == 0){
336 // dosage
337 matDosage.insert(matDosage.end(), values_fl.begin(), values_fl.end());
338 }else if( param.field.compare("GP") == 0){
339 // check if site ploidy > 2
340 if( record->ploidy() > 2 ){
341 throw std::runtime_error("GP is not supported for site with ploidy > 2\n " + variantToString(*record));
342 }
343
344 // genotype probabilities
345 vector<double> dsg = GP_to_dosage(values_fl, param.missingToMean);
346
347 matDosage.insert(matDosage.end(), dsg.begin(), dsg.end());
348 }
349 break;
350 case 3: // string. Convert GT to doubles
351
352 // check if site is multi-allelic
353 if( record->isMultiAllelics() ){
354 throw std::runtime_error("GT is not supported for multi-allelic site\n " + variantToString(*record));
355 }
356 // check if site ploidy > 2
357 if( record->ploidy() > 2 ){
358 throw std::runtime_error("GT is not supported for site with ploidy > 2\n " + variantToString(*record));
359 }
360
361 // get GT as int's with vector that is twice as long
362 record->getGenotypes(values_int);
363 tmp = intToDosage( values_int, param.missingToMean );
364 matDosage.insert(matDosage.end(), tmp.begin(), tmp.end());
365 break;
366 }
367
368 // store variant information
369 vInfo->addVariant(record->CHROM(),
370 record->POS(),
371 record->ID(),
372 record->REF(),
373 record->ALT() );
374 }
375
376 bool ret = true;
377
378 // if chunk is empty, return false
379 if( vInfo->size() == 0) ret = false;
380
381 return ret;
382 }
383
384 // check status of region
385 void checkStatus(const string & region){
386 switch( reader->getStatus( region ) ){
387 case 1: // region is vaild and not empty
388 break;
389
390 case 0: // the region is valid but empty.
391 break;
392
393 case -1: // there is no index file found.
394 throw runtime_error("Could not retrieve index file");
395 break;
396
397 case -2: // the region is not valid
398 Rcpp::Rcout << "region:" + region + ";\n";
399 throw runtime_error("region was not found: " + region );
400 break;
401 }
402 }
403};
404
405} // end namespace
406
407#endif
Definition GenomicDataStream_virtual.h:34
double getMinVariance()
Definition GenomicDataStream_virtual.h:213
double getMAF() const
Definition GenomicDataStream_virtual.h:219
Param param
Definition GenomicDataStream_virtual.h:267
GenomicDataStream()
Definition GenomicDataStream_virtual.h:181
Definition GenomicRanges.h:25
Definition VariantInfo.h:24
void clear()
Definition VariantInfo.h:142
GenomicRanges getChromRanges() override
Definition vcfstream.h:135
string getStreamType() override
Definition vcfstream.h:131
bool getNextChunk(DataChunk< arma::sp_mat > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:159
bool getNextChunk(DataChunk< Rcpp::NumericMatrix > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:215
bool getNextChunk(DataChunk< vector< double > > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:235
vcfstream(const Param &param)
Definition vcfstream.h:43
static string variantToString(const BcfRecord &record)
Definition vcfstream.h:254
vector< string > getSampleNames() override
Definition vcfstream.h:125
vcfstream()
Definition vcfstream.h:39
bool getNextChunk(DataChunk< arma::mat > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:139
bool getNextChunk(DataChunk< Eigen::SparseMatrix< double > > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:196
~vcfstream() override
Definition vcfstream.h:85
void setRegions(const vector< string > &regions) override
Definition vcfstream.h:93
bool getNextChunk(DataChunk< Eigen::MatrixXd > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:179
int n_samples() override
Definition vcfstream.h:119
Definition bgenstream.h:33
Definition GenomicDataStream_virtual.h:78
int chunkSize
Definition GenomicDataStream_virtual.h:170