GenomicDataStream
A scalable interface between data and analysis
Loading...
Searching...
No Matches
vcfstream.h
Go to the documentation of this file.
1/***********************************************************************
2 * @file vcfstream.h
3 * @author Gabriel Hoffman
4 * @email gabriel.hoffman@mssm.edu
5 * @brief vcfstream reads a VCF/VCFGZ/BCF into a matrix in chunks, storing variants in columns
6 * Copyright (C) 2024 Gabriel Hoffman
7 ***********************************************************************/
8
9
10#ifndef VCF_STREAM_H_
11#define VCF_STREAM_H_
12
13#ifndef DISABLE_EIGEN
14#include <Eigen/Sparse>
15#endif
16
17#include <filesystem>
18#include <string>
19
20#include <vcfpp.h>
21
22#include "VariantInfo.h"
24#include "utils.h"
25#include "GenomicRanges.h"
26
27using namespace std;
28using namespace vcfpp;
29
30namespace gds {
31
35class vcfstream :
36 public GenomicDataStream {
37 public:
38
40
44
45 // check that file exists
46 if( ! filesystem::exists( param.file ) ){
47 throw runtime_error("File does not exist: " + param.file);
48 }
49
50 // check that field was specified
51 if( param.field.compare("") == 0 ){
52 throw runtime_error("Field for VCF/BCF not specified");
53 }
54
55 // initialize
56 reader = new BcfReader( param.file );
57
58 // Set genomic regions regions
59 setRegions( param.regions );
60
61 reader->setSamples( param.samples );
62
63 // Initialize record with info in header
64 record = new BcfRecord( reader->header );
65
66 // 1: int; 2: float; 3: string; 0: error;
67 fieldType = reader->header.getFormatType( param.field );
68
69 if( fieldType == 3 && param.field.compare("GT"))
70 throw std::runtime_error("field GT is the only supported string type");
71 if( fieldType == 0)
72 throw std::runtime_error("field not found: " + param.field);
73 // initialize varInfo with sample names
74 vInfo = new VariantInfo( reader->SamplesName );
75
76 // Initialize vector with capacity to store variants
77 // Note, this allocates memory but does not change .size()
78 // After j variants have been inserted, only entries up to j*nsamples are populated
79 // the rest of the vector is allocated doesn't have valid data
80 matDosage.reserve( n_samples() * param.chunkSize );
81 }
82
85 ~vcfstream() override {
86 if( reader != nullptr) delete reader;
87 if( record != nullptr) delete record;
88 if( vInfo != nullptr) delete vInfo;
89 }
90
93 void setRegions(const vector<string> &regions) override {
94
95 validRegions.reserve(regions.size());
96 validRegions.clear();
97 copy(regions.begin(), regions.end(), back_inserter(validRegions));
98
99 // for(auto &region: regions){
100 // checkStatus( region );
101 // }
102
103 // initialize to false
104 continueIterating = false;
105
106 // if valid set is not empty
107 if( validRegions.size() > 0 ){
108 // initialize iterator
109 itReg = validRegions.begin();
110
111 reader->setRegion( *itReg );
112
113 continueIterating = true;
114 }
115 }
116
119 int n_samples() override {
120 return reader->nsamples;
121 }
122
125 vector<string> getSampleNames() override {
126 return reader->header.getSamples();
127 }
128
131 string getStreamType() override {
132 return toString( param.fileType);
133 }
134
136 return GenomicRanges();
137 }
138
139 bool getNextChunk( DataChunk<arma::mat> & chunk, const bool &useFilter = true) override {
140
141 // Update matDosage and vInfo for the chunk
142 bool ret = getNextChunk_helper();
143
144 if( useFilter ){
145 // modifies matDosage and vInfo directly
146 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
147 }
148
149 // mat(ptr_aux_mem, n_rows, n_cols, copy_aux_mem = true, strict = false)
150 bool copy_aux_mem = false; // create read-only matrix without re-allocating memory
151 arma::mat M(matDosage.data(), reader->nsamples, vInfo->size(), copy_aux_mem, true);
152
153 chunk = DataChunk<arma::mat>( M, vInfo );
154
155 return ret;
156 }
157
158 bool getNextChunk( DataChunk<arma::sp_mat> & chunk, const bool &useFilter = true) override {
159
160 // Update matDosage and vInfo for the chunk
161 bool ret = getNextChunk_helper();
162
163 if( useFilter ){
164 // modifies matDosage and vInfo directly
165 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
166 }
167
168 // otherwise, set chunk and return ret
169 arma::mat M(matDosage.data(), reader->nsamples, vInfo->size(), false, true);
170
171 // create sparse matrix from dense matrix
172 chunk = DataChunk<arma::sp_mat>( arma::sp_mat(M), vInfo );
173
174 return ret;
175 }
176
177 #ifndef DISABLE_EIGEN
178 bool getNextChunk( DataChunk<Eigen::MatrixXd> & chunk, const bool &useFilter = true) override {
179
180 // Update matDosage and vInfo for the chunk
181 bool ret = getNextChunk_helper();
182
183 if( useFilter ){
184 // modifies matDosage and vInfo directly
185 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
186 }
187
188 Eigen::MatrixXd M = Eigen::Map<Eigen::MatrixXd>(matDosage.data(), reader->nsamples, vInfo->size());
189
190 chunk = DataChunk<Eigen::MatrixXd>( M, vInfo );
191
192 return ret;
193 }
194
195 bool getNextChunk( DataChunk<Eigen::SparseMatrix<double> > & chunk, const bool &useFilter = true) override {
196
197 // Update matDosage and vInfo for the chunk
198 bool ret = getNextChunk_helper();
199
200 if( useFilter ){
201 // modifies matDosage and vInfo directly
202 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
203 }
204
205 Eigen::MatrixXd M = Eigen::Map<Eigen::MatrixXd>(matDosage.data(), reader->nsamples, vInfo->size());
206
207 chunk = DataChunk<Eigen::SparseMatrix<double> >( M.sparseView(), vInfo );
208
209 return ret;
210 }
211 #endif
212
213 #ifndef DISABLE_RCPP
214 bool getNextChunk( DataChunk<Rcpp::NumericMatrix> & chunk, const bool &useFilter = true) override {
215
216 // Update matDosage and vInfo for the chunk
217 bool ret = getNextChunk_helper();
218
219 if( useFilter ){
220 // modifies matDosage and vInfo directly
221 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
222 }
223
224 Rcpp::NumericMatrix M(reader->nsamples, vInfo->size(), matDosage.data());
225 colnames(M) = Rcpp::wrap( vInfo->getFeatureNames() );
226 rownames(M) = Rcpp::wrap( vInfo->sampleNames );
227
228 chunk = DataChunk<Rcpp::NumericMatrix>( M, vInfo );
229
230 return ret;
231 }
232 #endif
233
234 bool getNextChunk( DataChunk<vector<double>> & chunk, const bool &useFilter = true) override {
235
236 // Update matDosage and vInfo for the chunk
237 bool ret = getNextChunk_helper();
238
239 if( useFilter ){
240 // modifies matDosage and vInfo directly
241 applyVariantFilter(matDosage, vInfo, reader->nsamples, getMAF(), getMinVariance() );
242 }
243
244 chunk = DataChunk<vector<double>>( matDosage, vInfo );
245
246 return ret;
247 }
248
253 static string variantToString( const BcfRecord &record ) {
254 string s = record.CHROM() + ":" + to_string(record.POS()) + " " + record.ID() + " " + record.REF() + " " + record.ALT();
255 return s;
256 }
257
258 private:
259 BcfReader *reader = nullptr;
260 BcfRecord *record = nullptr;
261 VariantInfo *vInfo = nullptr;
262 vector<string>::iterator itReg;
263 vector<string> validRegions;
264
265 bool continueIterating;
266 int fieldType;
267
268 // store genotype values
269 // type used based on fieldType
270 // 1: int;
271 // 2: float;
272 // 3: string;
273 // 0: error;
274 vector<int> values_int;
275 vector<float> values_fl;
276
277 vector<double> tmp;
278
279 // stores genotype dosage as doubles, with the next marker inserted at the end
280 // NOTE that when current size is exceeded, .insert() reallocates memory
281 // this can be slow
282 // set using reserve() to set initial capacity so avoid re-alloc
283 vector<double> matDosage;
284
285 bool getNextChunk_helper(){
286
287 // if no valid regions
288 if( validRegions.size() == 0) return false;
289
290 // if end of file reached, return false
291 if( ! continueIterating ) return continueIterating;
292
293 // clear data, but keep allocated capacity
294 matDosage.clear();
295 vInfo->clear();
296
297 // loop thru variant, updating the count each time
298 unsigned int j;
299 for(j=0; j < param.chunkSize; j++){
300
301 // get next variant
302 // if false, reached end of region
303 if( ! reader->getNextVariant( *record ) ){
304
305 // else go to next region
306 itReg++;
307
308 // if this was the last region
309 // set continueIterating so false is retured at next call to
310 // getNextChunk_helper()
311 // then break since no data left
312 if( itReg == validRegions.end()){
313 continueIterating = false;
314 break;
315 }
316
317 // else
318 // initialize the record for this region
319 reader->setRegion( *itReg );
320 reader->getNextVariant( *record );
321 }
322
323 // populate genotype with the values of the current variant
324 // If string, convert to dosage
325 // use values vector based on fieldType
326 switch(fieldType){
327 case 1: // int
328 record->getFORMAT( param.field, values_int);
329 matDosage.insert(matDosage.end(), values_int.begin(), values_int.end());
330 break;
331 case 2: // float
332 record->getFORMAT( param.field, values_fl);
333
334 if( param.field.compare("DS") == 0){
335 // dosage
336 matDosage.insert(matDosage.end(), values_fl.begin(), values_fl.end());
337 }else if( param.field.compare("GP") == 0){
338 // check if site ploidy > 2
339 if( record->ploidy() > 2 ){
340 throw std::runtime_error("GP is not supported for site with ploidy > 2\n " + variantToString(*record));
341 }
342
343 // genotype probabilities
344 vector<double> dsg = GP_to_dosage(values_fl, param.missingToMean);
345
346 matDosage.insert(matDosage.end(), dsg.begin(), dsg.end());
347 }
348 break;
349 case 3: // string. Convert GT to doubles
350
351 // check if site is multi-allelic
352 if( record->isMultiAllelics() ){
353 throw std::runtime_error("GT is not supported for multi-allelic site\n " + variantToString(*record));
354 }
355 // check if site ploidy > 2
356 if( record->ploidy() > 2 ){
357 throw std::runtime_error("GT is not supported for site with ploidy > 2\n " + variantToString(*record));
358 }
359
360 // get GT as int's with vector that is twice as long
361 record->getGenotypes(values_int);
362 tmp = intToDosage( values_int, param.missingToMean );
363 matDosage.insert(matDosage.end(), tmp.begin(), tmp.end());
364 break;
365 }
366
367 // store variant information
368 vInfo->addVariant(record->CHROM(),
369 record->POS(),
370 record->ID(),
371 record->REF(),
372 record->ALT() );
373 }
374
375 bool ret = true;
376
377 // if chunk is empty, return false
378 if( vInfo->size() == 0) ret = false;
379
380 return ret;
381 }
382
383 // check status of region
384 void checkStatus(const string & region){
385 switch( reader->getStatus( region ) ){
386 case 1: // region is vaild and not empty
387 break;
388
389 case 0: // the region is valid but empty.
390 break;
391
392 case -1: // there is no index file found.
393 throw runtime_error("Could not retrieve index file");
394 break;
395
396 case -2: // the region is not valid
397 Rcpp::Rcout << "region:" + region + ";\n";
398 throw runtime_error("region was not found: " + region );
399 break;
400 }
401 }
402};
403
404} // end namespace
405
406#endif
Definition GenomicDataStream_virtual.h:34
double getMinVariance()
Definition GenomicDataStream_virtual.h:200
double getMAF() const
Definition GenomicDataStream_virtual.h:206
Param param
Definition GenomicDataStream_virtual.h:254
GenomicDataStream()
Definition GenomicDataStream_virtual.h:168
Definition GenomicRanges.h:25
Definition VariantInfo.h:24
void clear()
Definition VariantInfo.h:137
GenomicRanges getChromRanges() override
Definition vcfstream.h:135
string getStreamType() override
Definition vcfstream.h:131
bool getNextChunk(DataChunk< arma::sp_mat > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:158
bool getNextChunk(DataChunk< Rcpp::NumericMatrix > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:214
bool getNextChunk(DataChunk< vector< double > > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:234
vcfstream(const Param &param)
Definition vcfstream.h:43
static string variantToString(const BcfRecord &record)
Definition vcfstream.h:253
vector< string > getSampleNames() override
Definition vcfstream.h:125
vcfstream()
Definition vcfstream.h:39
bool getNextChunk(DataChunk< arma::mat > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:139
bool getNextChunk(DataChunk< Eigen::SparseMatrix< double > > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:195
~vcfstream() override
Definition vcfstream.h:85
void setRegions(const vector< string > &regions) override
Definition vcfstream.h:93
bool getNextChunk(DataChunk< Eigen::MatrixXd > &chunk, const bool &useFilter=true) override
Definition vcfstream.h:178
int n_samples() override
Definition vcfstream.h:119
Definition bgenstream.h:33
Definition GenomicDataStream_virtual.h:65
int chunkSize
Definition GenomicDataStream_virtual.h:157